Table of Contents
Following settings do have influence on Tyrus behaviour and are NOT part of WebSocket specification. If you are using following configurable options, your application might not be easily transferable to other WebSocket API implementation.
When accessing "wss" URLs, Tyrus client will pick up whatever keystore and truststore is actually set for current JVM instance, but that might not be always convenient. WebSocket API does not have this feature (yet, see WEBSOCKET_SPEC-210), so Tyrus exposed SSLEngineConfigurator class from Grizzly which can be used for specifying all SSL parameters to be used with current client instance. Additionally, WebSocket API does not have anything like a client, only WebSocketContainer and it does not have any properties, so you need to use Tyrus specific class - ClientManager.
final ClientManager client = ClientManager.createClient(); System.getProperties().put("javax.net.debug", "all"); System.getProperties().put(SSLContextConfigurator.KEY_STORE_FILE, "..."); System.getProperties().put(SSLContextConfigurator.TRUST_STORE_FILE, "..."); System.getProperties().put(SSLContextConfigurator.KEY_STORE_PASSWORD, "..."); System.getProperties().put(SSLContextConfigurator.TRUST_STORE_PASSWORD, "..."); final SSLContextConfigurator defaultConfig = new SSLContextConfigurator(); defaultConfig.retrieve(System.getProperties()); // or setup SSLContextConfigurator using its API. SSLEngineConfigurator sslEngineConfigurator = new SSLEngineConfigurator(defaultConfig, true, false, false); client.getProperties().put(GrizzlyEngine.SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator); client.connectToServer(... , ClientEndpointConfig.Builder.create().build(), new URI("wss://localhost:8181/sample-echo/echo")); }
WebSocketContainer.connectToServer(...) methods are by definition blocking - declared exceptions needs to be thrown after connection attempt is made and it returns Session instance, which needs to be ready for sending messages and invoking other methods, which require already estabilished connection.
Existing connectToServer methods are fine for lots of uses, but it might cause issue when you are designing application with highly responsible user interface. Tyrus introduces asynchronous variants to each connectToServer method (prefixed with "async"), which returns Future<Session>. These methods do only simple check for provided URL and the rest is executed in separate thread. All exceptions thrown during this phase are reported as cause of ExecutionException thrown when calling Future<Session>.get().
Asynchronous connect methods are declared on Tyrus implementation of WebSocketContainer called ClientManager.
ClientManager client = ClientManager.createClient(); final Future<Session> future = client.asyncConnectToServer(ClientEndpoint.class, URI.create("...")); try { future.get(); } catch (...) { }
ClientManager contains async alternative to each connectToServer method.
One of the typical usecases we've seen so far for WebSocket server-side endpoints is broadcasting messages to all connected clients, something like:
@OnMessage public void onMessage(Session session, String message) throws IOException { for (Session s : session.getOpenSessions()) { s.getBasicRemote().sendText(message); } }
Executing this code might cause serious load increase on your application server. Tyrus provides optimized broadcast implementation, which takes advantage of the fact, that we are sending exactly same message to all clients, so dataframe can be created and serialized only once. Furthermore, Tyrus can iterate over set of opened connections faster than Session.getOpenSession().
@OnMessage public void onMessage(Session session, String message) { ((TyrusSession) session).broadcast(message); }
Unfortunately, WebSocket API forbids anything else than Session in @OnMessage annotated method parameter, so you cannot use TyrusSession there directly and you might need to perform instanceof check.
Sevlet container buffers incoming WebSocket frames and there must be a size limit to precede OutOfMemory Exception and potentially DDoS attacks.
Configuration property is named "org.glassfish.tyrus.servlet.incoming-buffer-size"
and you can
set it in web.xml (this particular snipped sets the buffer size to 17000000 bytes (~16M payload):
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"> <context-param> <param-name>org.glassfish.tyrus.servlet.incoming-buffer-size</param-name> <param-value>17000000</param-value> </context-param> </web-app>
Default value is 4194315, which correspond to 4M plus few bytes to frame headers, so you should be able to receive up to 4M long message without the need to care about this property.
Same issue is present on client side. There you can set this property via ClientManager:
ClientManager client = ClientManager.createClient(); client.getProperties().put("org.glassfish.tyrus.incomingBufferSize", 6000000); // sets the incoming buffer size to 6000000 bytes. client.connectToServer( ... )
By default, WebSocket client implementation in Tyrus re-creates client runtime whenever WebSocketContainer#connectToServer is invoked. This approach gives us some perks like out-of-the-box isolation and relatively low thread count (currently we have 1 selector thread and 2 worker threads). Also it gives you the ability to stop the client runtime – one Session instance is tied to exactly one client runtime, so we can stop it when Session is closed. This seems as a good solution for most of WebSocket client use cases – you usually use java client from application which uses it for communicating with server side and you typically don’t need more than 10 instances (my personal estimate is that more than 90% applications won’t use more than 1 connection). There are several reasons for it – of it is just a client, it needs to preserve server resources – one WebSocket connection means one TCP connection and we don’t really want clients to consume more than needed. Previous statement may be invalidated by WebSocket multiplexing extension, but for now, it is still valid.
On the other hand, WebSocket client implementations in some other containers took another (also correct) approach – they share client runtime for creating all client connections. That means they might not have this strict one session one runtime policy, they cannot really give user way how he to control system resources, but surely it has another advantage – it can handle much more opened connections. Thread pools are share among client sessions which may or may not have some unforeseen consequences, but if its implemented correctly, it should outperform Tyrus solution mentioned in previous paragraph in some use cases, like the one mentioned in TYRUS-275 - performance tests. Reporter created simple program which used WebSocket API to create clients and connect to remote endpoint and he measured how many clients can he create (or in other words: how many parallel client connections can be created; I guess that original test case is to measure possible number of concurrent clients on server side, but that does not really matter for this post). Tyrus implementation loose compared to some other and it was exactly because it did not have shared client runtime capability.
How can you use this feature?
ClientManager client = ClientManager.createClient(); client.getProperties().put(GrizzlyClientContainer.SHARED_CONTAINER, true);
You might also want to specify container idle timeout:
client.getProperties().put(GrizzlyClientContainer.SHARED_CONTAINER_IDLE_TIMEOUT, 5);
Last but not least, you might want to specify thread pool sizes used by shared container (please use this feature only when you do know what are you doing. Grizzly by default does not limit max number of used threads, so if you do that, please make sure thread pool size fits your purpose):
client.getProperties().put(GrizzlyClientSocket.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(3)); client.getProperties().put(GrizzlyClientSocket.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
Please note that Extensions support is considered to be experimental and any API can be changed anytime. Also, you should ask yourself at least twice whether you don't want to achieve your goal by other means - WebSocket Extension is very powerful and can easily break your application when not used with care or enough expertise.
WebSocket frame used in ExtendedExtension:
public class Frame { public boolean isFin() { .. } public boolean isRsv1() { .. } public boolean isRsv2() { .. } public boolean isRsv3() { .. } public boolean isMask() { .. } public byte getOpcode() { .. } public long getPayloadLength() { .. } public int getMaskingKey() { .. } public byte[] getPayloadData() { .. } public boolean isControlFrame() { .. } public static Builder builder() { .. } public static Builder builder(Frame frame) { .. } public final static class Builder { public Builder() { .. } public Builder(Frame frame) { .. } public Frame build() { .. } public Builder fin(boolean fin) { .. } public Builder rsv1(boolean rsv1) { .. } public Builder rsv2(boolean rsv2) { .. } public Builder rsv3(boolean rsv3) { .. } public Builder mask(boolean mask) { .. } public Builder opcode(byte opcode) { .. } public Builder payloadLength(long payloadLength) { .. } public Builder maskingKey(int maskingKey) { .. } public Builder payloadData(byte[] payloadData) { .. } }
Frame is immutable, so if you want to create new one, you need to create new builder, modify what you want and build it:
Frame newFrame = Frame.builder(originalFrame).rsv1(true).build();
Note that there is only one convenience method: isControlFrame. Other information about frame type etc needs to be evaluated directly from opcode, simply because there might not be enough information to get the correct outcome or the information itself would not be very useful. For example: opcode 0×00 means continuation frame, but you don’t have any chance to get the information about actual type (text or binary) without intercepting data from previous frames. Consider Frame class as raw representation as possible. isControlFrame() can be also gathered from opcode, but it is at least always deterministic and it will be used by most of extension implementations. It is not usual to modify control frames as it might end with half closed connections or unanswered ping messages.
ExtendedExtension representation needs to be able to handle extension parameter negotiation and actual processing of incoming and outgoing frames. It also should be compatible with existing javax.websocket.Extension class, since we want to re-use existing registration API and be able to return new extension instance included in response from List<Extension> Session.getNegotiatedExtensions() call. Consider following:
public interface ExtendedExtension extends Extension { Frame processIncoming(ExtensionContext context, Frame frame); Frame processOutgoing(ExtensionContext context, Frame frame); List onExtensionNegotiation(ExtensionContext context, List requestedParameters); void onHandshakeResponse(ExtensionContext context, List responseParameters); void destroy(ExtensionContext context); interface ExtensionContext { Map<String, Object> getProperties(); } }
ExtendedExtension is capable of processing frames and influence parameter values during the handshake. Extension is used on both client and server side and since the negotiation is only place where this fact applies, we needed to somehow differentiate these sides. On server side, only onExtensionNegotiation(..) method is invoked and on client side onHandshakeResponse(..). Server side method is a must, client side could be somehow solved by implementing ClientEndpointConfig.Configurator#afterResponse(..) or calling Session.getNegotiatedExtenions(), but it won’t be as easy to get this information back to extension instance and even if it was, it won’t be very elegant. Also, you might suggest replacing processIncoming and processOutgoing methods by just oneprocess(Frame) method. That is also possible, but then you might have to assume current direction from frame instance or somehow from ExtensionContext, which is generally not a bad idea, but it resulted it slightly less readable code.
ExtensionContext and related lifecycle method is there because original javax.websocket.Extension is singleton and ExtendedExtension must obey this fact. But it does not meet some requirements we stated previously, like per connection parameter negotiation and of course processing itself will most likely have some connection state. Lifecycle of ExtensionContext is defined as follows: ExtensionContext instance is created right before onExtensionNegotiation (server side) or onHandshakeResponse (client side) and destroyed after destroy method invocation. Obviously, processIncoming or processOutgoing cannot be called before ExtensionContext is created or after is destroyed. You can think of handshake related methods as @OnOpenand destroy as @OnClose.
For those more familiar with WebSocket protocol: process*(ExtensionContext, Frame) is always invoked with unmasked frame, you don’t need to care about it. On the other side, payload is as it was received from the wire, before any validation (UTF-8 check for text messages). This fact is particularly important when you are modifying text message content, you need to make sure it is properly encoded in relation to other messages, because encoding/decoding process is stateful – remainder after UTF-8 coding is used as input to coding process for next message. If you want just test this feature and save yourself some headaches, don’t modify text message content or try binary messages instead.
Let’s say we want to create extension which will encrypt and decrypt first byte of every binary message. Assume we have a key (one byte) and our symmetrical cipher will be XOR. (Just for simplicity (a XOR key XOR key) = a, so encrypt() and decrypt() functions are the same).
public class CryptoExtension implements ExtendedExtension { @Override public Frame processIncoming(ExtensionContext context, Frame frame) { return lameCrypt(context, frame); } @Override public Frame processOutgoing(ExtensionContext context, Frame frame) { return lameCrypt(context, frame); } private Frame lameCrypt(ExtensionContext context, Frame frame) { if(!frame.isControlFrame() && (frame.getOpcode() == 0x02)) { final byte[] payloadData = frame.getPayloadData(); payloadData[0] ^= (Byte)(context.getProperties().get("key")); return Frame.builder(frame).payloadData(payloadData).build(); } else { return frame; } } @Override public List onExtensionNegotiation(ExtensionContext context, List requestedParameters) { init(context); // no params. return null; } @Override public void onHandshakeResponse(ExtensionContext context, List responseParameters) { init(context); } private void init(ExtensionContext context) { context.getProperties().put("key", (byte)0x55); } @Override public void destroy(ExtensionContext context) { context.getProperties().clear(); } @Override public String getName() { return "lame-crypto-extension"; } @Override public List getParameters() { // no params. return null; } }
You can see that ExtendedExtension is slightly more complicated that original Extension so the implementation has to be also not as straightforward.. on the other hand, it does something. Sample code above shows possible simplification mentioned earlier (one process method will be enough), but please take this as just sample implementation. Real world case is usually more complicated.
Now when we have our CryptoExtension implemented, we want to use it. There is nothing new compared to standard WebSocket Java API, feel free to skip this part if you are already familiar with it. Only programmatic version will be demonstrated. It is possible to do it for annotated version as well, but it is little bit more complicated on the server side and I want to keep the code as compact as possible.
Client registration
ArrayList extensions = new ArrayList(); extensions.add(new CryptoExtension()); final ClientEndpointConfig clientConfiguration = ClientEndpointConfig.Builder.create() .extensions(extensions).build(); WebSocketContainer client = ContainerProvider.getWebSocketContainer(); final Session session = client.connectToServer(new Endpoint() { @Override public void onOpen(Session session, EndpointConfig config) { // ... } }, clientConfiguration, URI.create(/* ... */));
Server registration:
public class CryptoExtensionApplicationConfig implements ServerApplicationConfig { @Override public Set getEndpointConfigs(Set<Class<? extends Endpoint>> endpointClasses) { Set endpointConfigs = new HashSet(); endpointConfigs.add( ServerEndpointConfig.Builder.create(EchoEndpoint.class, "/echo") .extensions(Arrays.asList(new CryptoExtension())).build() ); return endpointConfigs; } @Override public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) { // all scanned endpoints will be used. return scanned; } } public class EchoEndpoint extends Endpoint { @Override public void onOpen(Session session, EndpointConfig config) { // ... } }
CryptoExtensionApplicationConfig will be found by servlets scanning mechanism and automatically used for application configuration, no need to add anything (or even have) web.xml.
The original goal of whole extension support was to implement Permessage extension as defined in draft-ietf-hybi-permessage-compression-15 and we were able to achieve that goal. Well, not completely, current implementation ignores parameters. But it seems like it does not matter much, it was tested with Chrome and it works fine. Also it passes newest version of Autobahn test suite, which includes tests for this extension.
see PerMessageDeflateExtension.java (compatible with draft-ietf-hybi-permessage-compression-15, autobahn test suite) and XWebKitDeflateExtension.java (compatible with Chrome and Firefox – same as previous, just different extension name)
If you need semi-persistent client connection, you can always implement some reconnect logic by yourself, but Tyrus Client offers useful feature which should be much easier to use. See short sample code:
ClientManager client = ClientManager.createClient(); ClientManager.ReconnectHandler reconnectHandler = new ClientManager.ReconnectHandler() { private int counter = 0; @Override public boolean onDisconnect(CloseReason closeReason) { counter++; if (counter <= 3) { System.out.println("### Reconnecting... (reconnect count: " + counter + ")"); return true; } else { return false; } } @Override public boolean onConnectFailure(Exception exception) { counter++; if (counter <= 3) { System.out.println("### Reconnecting... (reconnect count: " + counter + ") " + exception.getMessage()); // Thread.sleep(...) or something other "sleep-like" expression can be put here - you might want // to do it here to avoid potential DDoS when you don't limit number of reconnects. return true; } else { return false; } } }; client.getProperties().put(ClientManager.RECONNECT_HANDLER, reconnectHandler); client.connectToServer(...)
As you can see, ReconnectHandler contains two methods, onDisconnect and onConnectFailure. First will be executed whenever @OnClose annotated method (or Endpoint.onClose(..)) is executed on client side - this should happen when established connection is lost for any reason. You can find the reason in methods parameter. Other one, called onConnectFailure is invoked when client fails to connect to remote endpoint, for example due to temporary network issue or current high server load.