18.3 WebSocket Communication Patterns
This section covers common patterns for implementing bidirectional communication over WebSocket connections, including request-response patterns, broadcast messaging, topic-based subscriptions, and message serialization strategies.
Request-Response Pattern
Many applications require request-response semantics over WebSocket, combining the efficiency of persistent connections with structured message flows.
Message Envelope with Correlation IDs:
public class RequestResponsePattern {
public static class Message {
public String id;
public String type;
public Object data;
public Message(String id, String type, Object data) {
this.id = id;
this.type = type;
this.data = data;
}
}
public static class RequestResponseClient {
private final WebSocket webSocket;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, CompletableFuture<Message>> pendingRequests = new ConcurrentHashMap<>();
private final AtomicLong requestIdCounter = new AtomicLong(0);
public RequestResponseClient(WebSocket webSocket) {
this.webSocket = webSocket;
}
public CompletableFuture<Message> sendRequest(String type, Object data) {
String requestId = "req-" + requestIdCounter.incrementAndGet();
Message request = new Message(requestId, type, data);
CompletableFuture<Message> responseFuture = new CompletableFuture<>();
pendingRequests.put(requestId, responseFuture);
// Send with timeout
CompletableFuture.delayedExecutor(30, TimeUnit.SECONDS)
.execute(() -> {
if (pendingRequests.remove(requestId) != null) {
responseFuture.completeExceptionally(
new TimeoutException("Response timeout for request " + requestId)
);
}
});
try {
String json = objectMapper.writeValueAsString(request);
webSocket.sendText(json, true);
} catch (JsonProcessingException e) {
responseFuture.completeExceptionally(e);
}
return responseFuture;
}
public void handleResponse(Message message) {
CompletableFuture<Message> future = pendingRequests.remove(message.id);
if (future != null) {
future.complete(message);
}
}
}
}
RPC-Style Invocation:
public class RpcPattern {
public interface RpcClient {
CompletableFuture<Object> invoke(String method, Object... args);
}
public static class JsonRpcClient implements RpcClient {
private final WebSocket webSocket;
private final ObjectMapper objectMapper;
private final Map<String, CompletableFuture<Object>> results = new ConcurrentHashMap<>();
private long messageId = 0;
public JsonRpcClient(WebSocket webSocket) {
this.webSocket = webSocket;
this.objectMapper = new ObjectMapper();
}
@Override
public CompletableFuture<Object> invoke(String method, Object... args) {
long id = ++messageId;
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
results.put(String.valueOf(id), resultFuture);
// Timeout after 30 seconds
scheduleTimeout(id, resultFuture);
try {
Map<String, Object> request = new LinkedHashMap<>();
request.put("jsonrpc", "2.0");
request.put("id", id);
request.put("method", method);
request.put("params", args);
String json = objectMapper.writeValueAsString(request);
webSocket.sendText(json, true);
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
return resultFuture;
}
public void handleResponse(Map<String, Object> response) {
String id = String.valueOf(response.get("id"));
CompletableFuture<Object> future = results.remove(id);
if (future != null) {
if (response.containsKey("result")) {
future.complete(response.get("result"));
} else if (response.containsKey("error")) {
future.completeExceptionally(
new RuntimeException(response.get("error").toString())
);
}
}
}
private void scheduleTimeout(long id, CompletableFuture<Object> future) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
CompletableFuture<Object> removed = results.remove(String.valueOf(id));
if (removed != null) {
removed.completeExceptionally(
new TimeoutException("RPC call timeout: " + id)
);
}
}, 30, TimeUnit.SECONDS);
}
}
}
Broadcast and Pub-Sub Patterns
WebSocket connections are effective for server-initiated messaging to multiple clients.
Topic Subscription:
public class PubSubPattern {
public interface Subscriber {
void onMessage(String topic, String message);
void onError(String topic, Throwable error);
}
public static class PubSubClient {
private final WebSocket webSocket;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, List<Subscriber>> subscriptions = new ConcurrentHashMap<>();
public PubSubClient(WebSocket webSocket) {
this.webSocket = webSocket;
}
public void subscribe(String topic, Subscriber subscriber) {
subscriptions.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
try {
Map<String, String> message = Map.of(
"action", "subscribe",
"topic", topic
);
String json = objectMapper.writeValueAsString(message);
webSocket.sendText(json, true);
} catch (Exception e) {
subscriber.onError(topic, e);
}
}
public void unsubscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscribers = subscriptions.get(topic);
if (subscribers != null) {
subscribers.remove(subscriber);
if (subscribers.isEmpty()) {
subscriptions.remove(topic);
try {
Map<String, String> message = Map.of(
"action", "unsubscribe",
"topic", topic
);
String json = objectMapper.writeValueAsString(message);
webSocket.sendText(json, true);
} catch (Exception e) {
subscriber.onError(topic, e);
}
}
}
}
public void handlePublishedMessage(String topic, String message) {
List<Subscriber> subscribers = subscriptions.get(topic);
if (subscribers != null) {
subscribers.forEach(sub -> sub.onMessage(topic, message));
}
}
}
// Usage
public static void main(String[] args) {
// Assume webSocket is already connected
// PubSubClient client = new PubSubClient(webSocket);
// client.subscribe("news", new Subscriber() {
// @Override
// public void onMessage(String topic, String message) {
// System.out.println("News: " + message);
// }
//
// @Override
// public void onError(String topic, Throwable error) {
// System.err.println("Error in topic " + topic + ": " + error);
// }
// });
}
}
Message Serialization and Deserialization
Structured message handling requires consistent serialization strategies.
JSON Serialization:
public class JsonSerializationPattern {
public static class MessageFrame {
@JsonProperty("type")
public String type;
@JsonProperty("timestamp")
public long timestamp;
@JsonProperty("payload")
public Map<String, Object> payload;
public MessageFrame() {
this.timestamp = System.currentTimeMillis();
}
public MessageFrame(String type, Map<String, Object> payload) {
this();
this.type = type;
this.payload = payload;
}
}
public static class SerializingListener implements WebSocket.Listener {
private final ObjectMapper objectMapper;
private final Consumer<MessageFrame> messageHandler;
public SerializingListener(Consumer<MessageFrame> messageHandler) {
this.objectMapper = new ObjectMapper();
this.messageHandler = messageHandler;
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
try {
MessageFrame frame = objectMapper.readValue(
data.toString(),
MessageFrame.class
);
messageHandler.accept(frame);
} catch (JsonProcessingException e) {
System.err.println("Failed to deserialize message: " + e);
}
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onOpen(WebSocket webSocket) {
webSocket.request(1);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
error.printStackTrace();
}
}
}
Protobuf Serialization for Binary Efficiency:
public class ProtobufSerializationPattern {
public static class BinaryMessageListener implements WebSocket.Listener {
private final Consumer<byte[]> messageHandler;
public BinaryMessageListener(Consumer<byte[]> messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
byte[] bytes = new byte[data.remaining()];
data.get(bytes);
// Parse protobuf message
try {
messageHandler.accept(bytes);
} catch (Exception e) {
System.err.println("Failed to process binary message: " + e);
}
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onOpen(WebSocket webSocket) {
webSocket.request(1);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
error.printStackTrace();
}
}
// Sending protobuf messages
public static void sendProtobufMessage(WebSocket webSocket, byte[] protoBytes) {
ByteBuffer buffer = ByteBuffer.wrap(protoBytes);
webSocket.sendBinary(buffer, true);
}
}
Stateful Message Routing
Complex applications require routing messages to appropriate handlers based on message type or state.
Message Router:
public class MessageRouter {
private final Map<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
public interface MessageHandler {
CompletionStage<?> handle(Map<String, Object> message);
}
public void registerHandler(String messageType, MessageHandler handler) {
handlers.put(messageType, handler);
}
public CompletionStage<?> route(String messageJson) {
try {
Map<String, Object> message = objectMapper.readValue(
messageJson,
new TypeReference<Map<String, Object>>() {}
);
String type = (String) message.get("type");
if (type == null) {
return CompletableFuture.failedStage(
new IllegalArgumentException("Message missing 'type' field")
);
}
MessageHandler handler = handlers.get(type);
if (handler == null) {
return CompletableFuture.failedStage(
new IllegalArgumentException("Unknown message type: " + type)
);
}
return handler.handle(message);
} catch (JsonProcessingException e) {
return CompletableFuture.failedStage(e);
}
}
// Usage
public static class RoutingExample {
public static void configureRouting(MessageRouter router) {
router.registerHandler("user.login", message -> {
System.out.println("Login: " + message.get("username"));
return CompletableFuture.completedStage(null);
});
router.registerHandler("data.sync", message -> {
System.out.println("Sync: " + message.get("timestamp"));
return CompletableFuture.completedStage(null);
});
router.registerHandler("notification", message -> {
System.out.println("Notification: " + message.get("text"));
return CompletableFuture.completedStage(null);
});
}
}
}
Stateful Connection Wrapper
A wrapper can simplify connection management and provide convenient send operations.
Connection Wrapper:
public class StatefulWebSocketConnection {
private final WebSocket webSocket;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, Object> state = new ConcurrentHashMap<>();
private volatile boolean connected = false;
public StatefulWebSocketConnection(WebSocket webSocket) {
this.webSocket = webSocket;
}
public void setState(String key, Object value) {
state.put(key, value);
}
public Object getState(String key) {
return state.get(key);
}
public void sendMessage(String type, Object payload) throws IOException {
Map<String, Object> message = new LinkedHashMap<>();
message.put("type", type);
message.put("timestamp", System.currentTimeMillis());
message.put("payload", payload);
String json = objectMapper.writeValueAsString(message);
webSocket.sendText(json, true);
}
public void sendRequest(String type, Object payload, Consumer<Map<String, Object>> responseHandler) {
try {
String requestId = UUID.randomUUID().toString();
Map<String, Object> message = new LinkedHashMap<>();
message.put("id", requestId);
message.put("type", type);
message.put("timestamp", System.currentTimeMillis());
message.put("payload", payload);
// Store handler for response matching
state.put("handler-" + requestId, responseHandler);
String json = objectMapper.writeValueAsString(message);
webSocket.sendText(json, true);
// Timeout after 30 seconds
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
Object handler = state.remove("handler-" + requestId);
if (handler != null) {
System.err.println("Request timeout: " + requestId);
}
}, 30, TimeUnit.SECONDS);
} catch (IOException e) {
System.err.println("Send failed: " + e);
}
}
public void markConnected() {
connected = true;
}
public boolean isConnected() {
return connected;
}
}
Heartbeat and Keep-Alive
Applications often implement heartbeat patterns to detect stale connections.
Heartbeat Manager:
public class HeartbeatManager {
private final WebSocket webSocket;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final long heartbeatInterval;
private final long heartbeatTimeout;
private volatile long lastHeartbeatAck = System.currentTimeMillis();
public HeartbeatManager(WebSocket webSocket, long interval, long timeout) {
this.webSocket = webSocket;
this.heartbeatInterval = interval;
this.heartbeatTimeout = timeout;
}
public void start() {
// Send heartbeats periodically
scheduler.scheduleAtFixedRate(
this::sendHeartbeat,
heartbeatInterval,
heartbeatInterval,
TimeUnit.MILLISECONDS
);
// Monitor for heartbeat timeout
scheduler.scheduleAtFixedRate(
this::checkHeartbeatTimeout,
heartbeatTimeout,
heartbeatTimeout,
TimeUnit.MILLISECONDS
);
}
private void sendHeartbeat() {
try {
webSocket.sendText("{\"type\":\"heartbeat\"}", true);
} catch (Exception e) {
System.err.println("Heartbeat send failed: " + e);
}
}
private void checkHeartbeatTimeout() {
long elapsed = System.currentTimeMillis() - lastHeartbeatAck;
if (elapsed > heartbeatTimeout) {
System.out.println("Heartbeat timeout, connection stale");
webSocket.sendClose(WebSocket.ABNORMAL_CLOSURE, "Heartbeat timeout");
}
}
public void acknowledgeHeartbeat() {
lastHeartbeatAck = System.currentTimeMillis();
}
public void stop() {
scheduler.shutdown();
}
}
Bidirectional Streaming
For continuous data streams, proper buffer and flow control management is essential.
Stream Handler:
public class StreamingPattern {
public static class DataStream<T> {
private final BlockingQueue<T> dataQueue;
private final int capacity;
public DataStream(int capacity) {
this.capacity = capacity;
this.dataQueue = new LinkedBlockingQueue<>(capacity);
}
public void put(T item) throws InterruptedException {
dataQueue.put(item);
}
public T take() throws InterruptedException {
return dataQueue.take();
}
public int remainingCapacity() {
return dataQueue.remainingCapacity();
}
}
public static class StreamingListener<T> implements WebSocket.Listener {
private final DataStream<T> stream;
private final Function<String, T> deserializer;
private final int requestThreshold;
public StreamingListener(DataStream<T> stream, Function<String, T> deserializer) {
this.stream = stream;
this.deserializer = deserializer;
this.requestThreshold = 1; // Simple threshold
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
return CompletableFuture.runAsync(() -> {
try {
T item = deserializer.apply(data.toString());
stream.put(item);
// Only request more if queue not full
if (stream.remainingCapacity() > requestThreshold) {
webSocket.request(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Override
public void onOpen(WebSocket webSocket) {
webSocket.request(1);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
error.printStackTrace();
}
}
}
Summary
Common WebSocket communication patterns include:
- Request-Response: Correlation IDs and futures for RPC-style calls
- Pub-Sub: Topic subscriptions with multi-client broadcasting
- Serialization: JSON for interoperability, binary formats for efficiency
- Message Routing: Type-based handler dispatch for complex protocols
- Stateful Connections: Wrapping WebSocket with application state
- Keep-Alive: Heartbeats and ping/pong for connection health
- Streaming: Flow control and buffering for continuous data
These patterns enable building sophisticated real-time applications with WebSocket.