18.3 WebSocket Communication Patterns

This section covers common patterns for implementing bidirectional communication over WebSocket connections, including request-response patterns, broadcast messaging, topic-based subscriptions, and message serialization strategies.

Request-Response Pattern

Many applications require request-response semantics over WebSocket, combining the efficiency of persistent connections with structured message flows.

Message Envelope with Correlation IDs:

public class RequestResponsePattern {
    public static class Message {
        public String id;
        public String type;
        public Object data;

        public Message(String id, String type, Object data) {
            this.id = id;
            this.type = type;
            this.data = data;
        }
    }

    public static class RequestResponseClient {
        private final WebSocket webSocket;
        private final ObjectMapper objectMapper = new ObjectMapper();
        private final Map<String, CompletableFuture<Message>> pendingRequests = new ConcurrentHashMap<>();
        private final AtomicLong requestIdCounter = new AtomicLong(0);

        public RequestResponseClient(WebSocket webSocket) {
            this.webSocket = webSocket;
        }

        public CompletableFuture<Message> sendRequest(String type, Object data) {
            String requestId = "req-" + requestIdCounter.incrementAndGet();
            Message request = new Message(requestId, type, data);

            CompletableFuture<Message> responseFuture = new CompletableFuture<>();
            pendingRequests.put(requestId, responseFuture);

            // Send with timeout
            CompletableFuture.delayedExecutor(30, TimeUnit.SECONDS)
                .execute(() -> {
                    if (pendingRequests.remove(requestId) != null) {
                        responseFuture.completeExceptionally(
                            new TimeoutException("Response timeout for request " + requestId)
                        );
                    }
                });

            try {
                String json = objectMapper.writeValueAsString(request);
                webSocket.sendText(json, true);
            } catch (JsonProcessingException e) {
                responseFuture.completeExceptionally(e);
            }

            return responseFuture;
        }

        public void handleResponse(Message message) {
            CompletableFuture<Message> future = pendingRequests.remove(message.id);
            if (future != null) {
                future.complete(message);
            }
        }
    }
}

RPC-Style Invocation:

public class RpcPattern {
    public interface RpcClient {
        CompletableFuture<Object> invoke(String method, Object... args);
    }

    public static class JsonRpcClient implements RpcClient {
        private final WebSocket webSocket;
        private final ObjectMapper objectMapper;
        private final Map<String, CompletableFuture<Object>> results = new ConcurrentHashMap<>();
        private long messageId = 0;

        public JsonRpcClient(WebSocket webSocket) {
            this.webSocket = webSocket;
            this.objectMapper = new ObjectMapper();
        }

        @Override
        public CompletableFuture<Object> invoke(String method, Object... args) {
            long id = ++messageId;
            CompletableFuture<Object> resultFuture = new CompletableFuture<>();
            results.put(String.valueOf(id), resultFuture);

            // Timeout after 30 seconds
            scheduleTimeout(id, resultFuture);

            try {
                Map<String, Object> request = new LinkedHashMap<>();
                request.put("jsonrpc", "2.0");
                request.put("id", id);
                request.put("method", method);
                request.put("params", args);

                String json = objectMapper.writeValueAsString(request);
                webSocket.sendText(json, true);
            } catch (Exception e) {
                resultFuture.completeExceptionally(e);
            }

            return resultFuture;
        }

        public void handleResponse(Map<String, Object> response) {
            String id = String.valueOf(response.get("id"));
            CompletableFuture<Object> future = results.remove(id);

            if (future != null) {
                if (response.containsKey("result")) {
                    future.complete(response.get("result"));
                } else if (response.containsKey("error")) {
                    future.completeExceptionally(
                        new RuntimeException(response.get("error").toString())
                    );
                }
            }
        }

        private void scheduleTimeout(long id, CompletableFuture<Object> future) {
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            scheduler.schedule(() -> {
                CompletableFuture<Object> removed = results.remove(String.valueOf(id));
                if (removed != null) {
                    removed.completeExceptionally(
                        new TimeoutException("RPC call timeout: " + id)
                    );
                }
            }, 30, TimeUnit.SECONDS);
        }
    }
}

Broadcast and Pub-Sub Patterns

WebSocket connections are effective for server-initiated messaging to multiple clients.

Topic Subscription:

public class PubSubPattern {
    public interface Subscriber {
        void onMessage(String topic, String message);
        void onError(String topic, Throwable error);
    }

    public static class PubSubClient {
        private final WebSocket webSocket;
        private final ObjectMapper objectMapper = new ObjectMapper();
        private final Map<String, List<Subscriber>> subscriptions = new ConcurrentHashMap<>();

        public PubSubClient(WebSocket webSocket) {
            this.webSocket = webSocket;
        }

        public void subscribe(String topic, Subscriber subscriber) {
            subscriptions.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
                .add(subscriber);

            try {
                Map<String, String> message = Map.of(
                    "action", "subscribe",
                    "topic", topic
                );
                String json = objectMapper.writeValueAsString(message);
                webSocket.sendText(json, true);
            } catch (Exception e) {
                subscriber.onError(topic, e);
            }
        }

        public void unsubscribe(String topic, Subscriber subscriber) {
            List<Subscriber> subscribers = subscriptions.get(topic);
            if (subscribers != null) {
                subscribers.remove(subscriber);
                if (subscribers.isEmpty()) {
                    subscriptions.remove(topic);

                    try {
                        Map<String, String> message = Map.of(
                            "action", "unsubscribe",
                            "topic", topic
                        );
                        String json = objectMapper.writeValueAsString(message);
                        webSocket.sendText(json, true);
                    } catch (Exception e) {
                        subscriber.onError(topic, e);
                    }
                }
            }
        }

        public void handlePublishedMessage(String topic, String message) {
            List<Subscriber> subscribers = subscriptions.get(topic);
            if (subscribers != null) {
                subscribers.forEach(sub -> sub.onMessage(topic, message));
            }
        }
    }

    // Usage
    public static void main(String[] args) {
        // Assume webSocket is already connected
        // PubSubClient client = new PubSubClient(webSocket);

        // client.subscribe("news", new Subscriber() {
        //     @Override
        //     public void onMessage(String topic, String message) {
        //         System.out.println("News: " + message);
        //     }
        //     
        //     @Override
        //     public void onError(String topic, Throwable error) {
        //         System.err.println("Error in topic " + topic + ": " + error);
        //     }
        // });
    }
}

Message Serialization and Deserialization

Structured message handling requires consistent serialization strategies.

JSON Serialization:

public class JsonSerializationPattern {
    public static class MessageFrame {
        @JsonProperty("type")
        public String type;

        @JsonProperty("timestamp")
        public long timestamp;

        @JsonProperty("payload")
        public Map<String, Object> payload;

        public MessageFrame() {
            this.timestamp = System.currentTimeMillis();
        }

        public MessageFrame(String type, Map<String, Object> payload) {
            this();
            this.type = type;
            this.payload = payload;
        }
    }

    public static class SerializingListener implements WebSocket.Listener {
        private final ObjectMapper objectMapper;
        private final Consumer<MessageFrame> messageHandler;

        public SerializingListener(Consumer<MessageFrame> messageHandler) {
            this.objectMapper = new ObjectMapper();
            this.messageHandler = messageHandler;
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            try {
                MessageFrame frame = objectMapper.readValue(
                    data.toString(),
                    MessageFrame.class
                );
                messageHandler.accept(frame);
            } catch (JsonProcessingException e) {
                System.err.println("Failed to deserialize message: " + e);
            }

            webSocket.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onOpen(WebSocket webSocket) {
            webSocket.request(1);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            error.printStackTrace();
        }
    }
}

Protobuf Serialization for Binary Efficiency:

public class ProtobufSerializationPattern {
    public static class BinaryMessageListener implements WebSocket.Listener {
        private final Consumer<byte[]> messageHandler;

        public BinaryMessageListener(Consumer<byte[]> messageHandler) {
            this.messageHandler = messageHandler;
        }

        @Override
        public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
            byte[] bytes = new byte[data.remaining()];
            data.get(bytes);

            // Parse protobuf message
            try {
                messageHandler.accept(bytes);
            } catch (Exception e) {
                System.err.println("Failed to process binary message: " + e);
            }

            webSocket.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onOpen(WebSocket webSocket) {
            webSocket.request(1);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            error.printStackTrace();
        }
    }

    // Sending protobuf messages
    public static void sendProtobufMessage(WebSocket webSocket, byte[] protoBytes) {
        ByteBuffer buffer = ByteBuffer.wrap(protoBytes);
        webSocket.sendBinary(buffer, true);
    }
}

Stateful Message Routing

Complex applications require routing messages to appropriate handlers based on message type or state.

Message Router:

public class MessageRouter {
    private final Map<String, MessageHandler> handlers = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper = new ObjectMapper();

    public interface MessageHandler {
        CompletionStage<?> handle(Map<String, Object> message);
    }

    public void registerHandler(String messageType, MessageHandler handler) {
        handlers.put(messageType, handler);
    }

    public CompletionStage<?> route(String messageJson) {
        try {
            Map<String, Object> message = objectMapper.readValue(
                messageJson,
                new TypeReference<Map<String, Object>>() {}
            );

            String type = (String) message.get("type");
            if (type == null) {
                return CompletableFuture.failedStage(
                    new IllegalArgumentException("Message missing 'type' field")
                );
            }

            MessageHandler handler = handlers.get(type);
            if (handler == null) {
                return CompletableFuture.failedStage(
                    new IllegalArgumentException("Unknown message type: " + type)
                );
            }

            return handler.handle(message);
        } catch (JsonProcessingException e) {
            return CompletableFuture.failedStage(e);
        }
    }

    // Usage
    public static class RoutingExample {
        public static void configureRouting(MessageRouter router) {
            router.registerHandler("user.login", message -> {
                System.out.println("Login: " + message.get("username"));
                return CompletableFuture.completedStage(null);
            });

            router.registerHandler("data.sync", message -> {
                System.out.println("Sync: " + message.get("timestamp"));
                return CompletableFuture.completedStage(null);
            });

            router.registerHandler("notification", message -> {
                System.out.println("Notification: " + message.get("text"));
                return CompletableFuture.completedStage(null);
            });
        }
    }
}

Stateful Connection Wrapper

A wrapper can simplify connection management and provide convenient send operations.

Connection Wrapper:

public class StatefulWebSocketConnection {
    private final WebSocket webSocket;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Map<String, Object> state = new ConcurrentHashMap<>();
    private volatile boolean connected = false;

    public StatefulWebSocketConnection(WebSocket webSocket) {
        this.webSocket = webSocket;
    }

    public void setState(String key, Object value) {
        state.put(key, value);
    }

    public Object getState(String key) {
        return state.get(key);
    }

    public void sendMessage(String type, Object payload) throws IOException {
        Map<String, Object> message = new LinkedHashMap<>();
        message.put("type", type);
        message.put("timestamp", System.currentTimeMillis());
        message.put("payload", payload);

        String json = objectMapper.writeValueAsString(message);
        webSocket.sendText(json, true);
    }

    public void sendRequest(String type, Object payload, Consumer<Map<String, Object>> responseHandler) {
        try {
            String requestId = UUID.randomUUID().toString();

            Map<String, Object> message = new LinkedHashMap<>();
            message.put("id", requestId);
            message.put("type", type);
            message.put("timestamp", System.currentTimeMillis());
            message.put("payload", payload);

            // Store handler for response matching
            state.put("handler-" + requestId, responseHandler);

            String json = objectMapper.writeValueAsString(message);
            webSocket.sendText(json, true);

            // Timeout after 30 seconds
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            scheduler.schedule(() -> {
                Object handler = state.remove("handler-" + requestId);
                if (handler != null) {
                    System.err.println("Request timeout: " + requestId);
                }
            }, 30, TimeUnit.SECONDS);

        } catch (IOException e) {
            System.err.println("Send failed: " + e);
        }
    }

    public void markConnected() {
        connected = true;
    }

    public boolean isConnected() {
        return connected;
    }
}

Heartbeat and Keep-Alive

Applications often implement heartbeat patterns to detect stale connections.

Heartbeat Manager:

public class HeartbeatManager {
    private final WebSocket webSocket;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final long heartbeatInterval;
    private final long heartbeatTimeout;
    private volatile long lastHeartbeatAck = System.currentTimeMillis();

    public HeartbeatManager(WebSocket webSocket, long interval, long timeout) {
        this.webSocket = webSocket;
        this.heartbeatInterval = interval;
        this.heartbeatTimeout = timeout;
    }

    public void start() {
        // Send heartbeats periodically
        scheduler.scheduleAtFixedRate(
            this::sendHeartbeat,
            heartbeatInterval,
            heartbeatInterval,
            TimeUnit.MILLISECONDS
        );

        // Monitor for heartbeat timeout
        scheduler.scheduleAtFixedRate(
            this::checkHeartbeatTimeout,
            heartbeatTimeout,
            heartbeatTimeout,
            TimeUnit.MILLISECONDS
        );
    }

    private void sendHeartbeat() {
        try {
            webSocket.sendText("{\"type\":\"heartbeat\"}", true);
        } catch (Exception e) {
            System.err.println("Heartbeat send failed: " + e);
        }
    }

    private void checkHeartbeatTimeout() {
        long elapsed = System.currentTimeMillis() - lastHeartbeatAck;
        if (elapsed > heartbeatTimeout) {
            System.out.println("Heartbeat timeout, connection stale");
            webSocket.sendClose(WebSocket.ABNORMAL_CLOSURE, "Heartbeat timeout");
        }
    }

    public void acknowledgeHeartbeat() {
        lastHeartbeatAck = System.currentTimeMillis();
    }

    public void stop() {
        scheduler.shutdown();
    }
}

Bidirectional Streaming

For continuous data streams, proper buffer and flow control management is essential.

Stream Handler:

public class StreamingPattern {
    public static class DataStream<T> {
        private final BlockingQueue<T> dataQueue;
        private final int capacity;

        public DataStream(int capacity) {
            this.capacity = capacity;
            this.dataQueue = new LinkedBlockingQueue<>(capacity);
        }

        public void put(T item) throws InterruptedException {
            dataQueue.put(item);
        }

        public T take() throws InterruptedException {
            return dataQueue.take();
        }

        public int remainingCapacity() {
            return dataQueue.remainingCapacity();
        }
    }

    public static class StreamingListener<T> implements WebSocket.Listener {
        private final DataStream<T> stream;
        private final Function<String, T> deserializer;
        private final int requestThreshold;

        public StreamingListener(DataStream<T> stream, Function<String, T> deserializer) {
            this.stream = stream;
            this.deserializer = deserializer;
            this.requestThreshold = 1; // Simple threshold
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            return CompletableFuture.runAsync(() -> {
                try {
                    T item = deserializer.apply(data.toString());
                    stream.put(item);

                    // Only request more if queue not full
                    if (stream.remainingCapacity() > requestThreshold) {
                        webSocket.request(1);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        @Override
        public void onOpen(WebSocket webSocket) {
            webSocket.request(1);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            error.printStackTrace();
        }
    }
}

Summary

Common WebSocket communication patterns include:

  • Request-Response: Correlation IDs and futures for RPC-style calls
  • Pub-Sub: Topic subscriptions with multi-client broadcasting
  • Serialization: JSON for interoperability, binary formats for efficiency
  • Message Routing: Type-based handler dispatch for complex protocols
  • Stateful Connections: Wrapping WebSocket with application state
  • Keep-Alive: Heartbeats and ping/pong for connection health
  • Streaming: Flow control and buffering for continuous data

These patterns enable building sophisticated real-time applications with WebSocket.