18.2 WebSocket Client API

The Java WebSocket client API provides a builder-based approach to creating WebSocket connections, configuring client parameters, and handling bidirectional communication. This section covers the WebSocketBuilder configuration, connection establishment, and send/receive operations.

WebSocketBuilder Configuration

The HttpClient provides a WebSocketBuilder accessible via newWebSocketBuilder() for configuring WebSocket connections.

Basic Connection:

public class BasicWebSocketConnection {
    public static void main(String[] args) throws Exception {
        HttpClient client = HttpClient.newHttpClient();

        CompletableFuture<WebSocket> wsFuture = client.newWebSocketBuilder()
            .buildAsync(
                URI.create("wss://echo.websocket.org"),
                new WebSocket.Listener() {
                    @Override
                    public void onOpen(WebSocket webSocket) {
                        System.out.println("Connected");
                        webSocket.request(1); // Request first message
                    }

                    @Override
                    public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
                        System.out.println("Received: " + data);
                        ws.request(1); // Request next message
                        return CompletableFuture.completedStage(null);
                    }

                    @Override
                    public void onError(WebSocket ws, Throwable error) {
                        error.printStackTrace();
                    }
                }
            );

        WebSocket ws = wsFuture.join();
        ws.sendText("Hello, WebSocket!", true);

        Thread.sleep(2000);
        ws.sendClose(WebSocket.NORMAL_CLOSURE, "bye");
    }
}

Builder Configuration Options:

public class WebSocketBuilderConfiguration {
    public void configureBuilder(HttpClient httpClient) throws Exception {
        WebSocket ws = httpClient.newWebSocketBuilder()
            // Connection timeout (upgrade handshake)
            .connectTimeout(Duration.ofSeconds(10))

            // Subprotocols: Advertise supported application protocols
            .subprotocols("chat", "superchat")

            // Headers: Custom headers for upgrade request
            .header("X-Custom-Header", "value")
            .header("Authorization", "Bearer token123")

            // Build and connect asynchronously
            .buildAsync(
                URI.create("wss://example.com/chat"),
                new SimpleWebSocketListener()
            )
            .join(); // Wait for connection

        ws.sendText("Connected!", true);
    }

    private static class SimpleWebSocketListener implements WebSocket.Listener {
        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("WebSocket opened");
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
            System.out.println("Text: " + data);
            ws.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onError(WebSocket ws, Throwable error) {
            System.err.println("Error: " + error);
        }
    }
}

WebSocket.Listener Interface

The Listener interface defines callbacks for WebSocket lifecycle and message events.

Listener Methods:

public interface WebSocket.Listener {
    // Invoked when WebSocket upgrade completes
    void onOpen(WebSocket webSocket);

    // Text message received
    CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast);

    // Binary message received
    CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean isLast);

    // Ping received (must respond with pong)
    CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message);

    // Pong received (ping response)
    CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message);

    // Close frame received
    CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason);

    // Connection error
    void onError(WebSocket webSocket, Throwable error);
}

Complete Listener Implementation:

public class FullFeaturedListener implements WebSocket.Listener {
    private static final Logger logger = LoggerFactory.getLogger(FullFeaturedListener.class);
    private StringBuilder textMessageBuilder = new StringBuilder();
    private List<ByteBuffer> binaryMessageFrames = new ArrayList<>();

    @Override
    public void onOpen(WebSocket webSocket) {
        logger.info("WebSocket connection established");
        webSocket.request(1); // Request first message
    }

    @Override
    public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast) {
        logger.debug("Text frame received: {} chars, isLast: {}", data.length(), isLast);
        textMessageBuilder.append(data);

        if (isLast) {
            String completeMessage = textMessageBuilder.toString();
            logger.info("Complete text message: {}", completeMessage);
            textMessageBuilder = new StringBuilder();
            return handleTextMessage(webSocket, completeMessage);
        }

        webSocket.request(1); // Request next frame
        return CompletableFuture.completedStage(null);
    }

    @Override
    public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean isLast) {
        logger.debug("Binary frame received: {} bytes, isLast: {}", data.remaining(), isLast);
        binaryMessageFrames.add(data);

        if (isLast) {
            ByteBuffer completeMessage = assembleBinaryMessage();
            logger.info("Complete binary message: {} bytes", completeMessage.remaining());
            return handleBinaryMessage(webSocket, completeMessage);
        }

        webSocket.request(1);
        return CompletableFuture.completedStage(null);
    }

    @Override
    public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
        logger.debug("Ping received: {} bytes", message.remaining());
        // Automatically respond with pong
        return webSocket.sendPong(message).thenRun(() -> {
            logger.debug("Pong sent");
            webSocket.request(1);
        });
    }

    @Override
    public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
        logger.debug("Pong received: {} bytes", message.remaining());
        webSocket.request(1);
        return CompletableFuture.completedStage(null);
    }

    @Override
    public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
        logger.info("Close received: {} - {}", statusCode, reason);

        // Auto-respond to close (already done by implementation)
        // Perform cleanup if needed
        return cleanup();
    }

    @Override
    public void onError(WebSocket webSocket, Throwable error) {
        logger.error("WebSocket error", error);
    }

    private CompletionStage<?> handleTextMessage(WebSocket ws, String message) {
        // Process message asynchronously
        return CompletableFuture.runAsync(() -> {
            // Message processing logic
            try {
                Thread.sleep(100); // Simulate processing
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).thenRun(() -> ws.request(1));
    }

    private CompletionStage<?> handleBinaryMessage(WebSocket ws, ByteBuffer data) {
        return CompletableFuture.runAsync(() -> {
            // Binary message processing logic
        }).thenRun(() -> ws.request(1));
    }

    private ByteBuffer assembleBinaryMessage() {
        int totalSize = binaryMessageFrames.stream()
            .mapToInt(ByteBuffer::remaining)
            .sum();

        ByteBuffer combined = ByteBuffer.allocate(totalSize);
        binaryMessageFrames.forEach(combined::put);
        combined.flip();

        binaryMessageFrames.clear();
        return combined;
    }

    private CompletionStage<?> cleanup() {
        return CompletableFuture.completedStage(null);
    }
}

Sending Messages

The WebSocket interface provides methods for sending different message types.

Send Methods:

public interface WebSocket {
    // Send text message
    CompletionStage<?> sendText(CharSequence data, boolean isLast);

    // Send binary message
    CompletionStage<?> sendBinary(ByteBuffer data, boolean isLast);

    // Send ping (client-initiated)
    CompletionStage<?> sendPing(ByteBuffer message);

    // Send pong (ping response)
    CompletionStage<?> sendPong(ByteBuffer message);

    // Send close frame
    CompletionStage<?> sendClose(int statusCode, String reason);

    // Control backpressure: Request n messages
    void request(long n);

    // Get subprotocol agreed with server
    String getSubprotocol();

    // Get whether connection is open
    boolean isOutputClosed();
}

Practical Send Examples:

public class MessageSending {
    private WebSocket webSocket;

    // Send single text message
    public void sendTextMessage(String message) {
        webSocket.sendText(message, true)
            .thenRun(() -> System.out.println("Text sent"))
            .exceptionally(e -> {
                System.err.println("Send failed: " + e);
                return null;
            });
    }

    // Send large text message in chunks
    public void sendLargeTextMessage(String message) throws Exception {
        int chunkSize = 1024;
        String[] chunks = message.split("(?<=\\G.{" + chunkSize + "})");

        CompletableFuture<?> chain = CompletableFuture.completedStage(null);

        for (int i = 0; i < chunks.length; i++) {
            boolean isLast = (i == chunks.length - 1);
            String chunk = chunks[i];

            chain = chain.thenCompose(v -> 
                webSocket.sendText(chunk, isLast)
            );
        }

        chain.join();
    }

    // Send binary message
    public void sendBinaryMessage(byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data);
        webSocket.sendBinary(buffer, true)
            .thenRun(() -> System.out.println("Binary sent"))
            .exceptionally(e -> {
                System.err.println("Binary send failed: " + e);
                return null;
            });
    }

    // Send multiple text frames
    public void sendMultipleFrames(List<String> frames) {
        CompletableFuture<?> chain = CompletableFuture.completedStage(null);

        for (int i = 0; i < frames.size(); i++) {
            boolean isLast = (i == frames.size() - 1);
            String frame = frames.get(i);

            chain = chain.thenCompose(v -> 
                webSocket.sendText(frame, isLast)
            );
        }

        chain.join();
    }

    // Send and wait for response
    public String sendRequestResponse(String request) throws Exception {
        final String[] response = new String[1];
        final CompletableFuture<String> responseFuture = new CompletableFuture<>();

        // Send message
        webSocket.sendText(request, true)
            .thenRun(() -> {
                // Response will be handled by listener
            })
            .join();

        // Wait for response (with timeout)
        return responseFuture.get(5, TimeUnit.SECONDS);
    }
}

Backpressure Management

WebSocket communication uses a request-based backpressure model. Clients must call request(n) to indicate readiness for messages.

Backpressure Pattern:

public class BackpressureManager implements WebSocket.Listener {
    private static final long REQUEST_BATCH_SIZE = 10;

    @Override
    public void onOpen(WebSocket webSocket) {
        System.out.println("Requesting initial batch");
        webSocket.request(REQUEST_BATCH_SIZE);
    }

    @Override
    public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast) {
        // Process message with controlled concurrency
        return CompletableFuture.runAsync(() -> {
            processMessage(data.toString());
        }).thenRun(() -> {
            // Request next message after processing
            webSocket.request(1);
        });
    }

    private void processMessage(String data) {
        // Message processing logic
        System.out.println("Processing: " + data);
    }
}

Subprotocol Negotiation

Subprotocols allow clients and servers to agree on application-level protocols.

Subprotocol Example:

public class SubprotocolNegotiation {
    public void connectWithSubprotocol(HttpClient httpClient) throws Exception {
        WebSocket ws = httpClient.newWebSocketBuilder()
            // Advertise multiple subprotocols in preference order
            .subprotocols("chat", "superchat", "json-rpc")
            .buildAsync(
                URI.create("wss://example.com/protocol"),
                new SubprotocolListener()
            )
            .join();
    }

    private static class SubprotocolListener implements WebSocket.Listener {
        @Override
        public void onOpen(WebSocket webSocket) {
            // Get negotiated subprotocol
            String subprotocol = webSocket.getSubprotocol();
            System.out.println("Negotiated subprotocol: " + subprotocol);

            if ("chat".equals(subprotocol)) {
                webSocket.sendText("JOIN_CHAT", true);
            } else if ("json-rpc".equals(subprotocol)) {
                webSocket.sendText("{\"jsonrpc\":\"2.0\",\"method\":\"connect\"}", true);
            }

            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
            System.out.println("Message: " + data);
            ws.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onError(WebSocket ws, Throwable error) {
            error.printStackTrace();
        }
    }
}

Extension Negotiation

The WebSocket protocol supports extensions (like permessage-deflate for compression).

Extension Handling:

public class ExtensionNegotiation {
    public void connectWithExtensions(HttpClient httpClient) throws Exception {
        WebSocket ws = httpClient.newWebSocketBuilder()
            // Request compression extension
            .header("Sec-WebSocket-Extensions", "permessage-deflate")
            .buildAsync(
                URI.create("wss://example.com/compressed"),
                new ExtensionAwareListener()
            )
            .join();
    }

    private static class ExtensionAwareListener implements WebSocket.Listener {
        private boolean compressionEnabled = false;

        @Override
        public void onOpen(WebSocket webSocket) {
            // Extensions would be negotiated during handshake
            // Check server response for extension confirmation
            System.out.println("Connection opened");
            System.out.println("Compression enabled: " + compressionEnabled);
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
            // Received data is automatically decompressed by implementation
            System.out.println("Decompressed message: " + data);
            ws.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onError(WebSocket ws, Throwable error) {
            error.printStackTrace();
        }
    }
}

Custom Headers

Custom headers can be added to the WebSocket upgrade request for authentication or other purposes.

Custom Header Usage:

public class CustomHeadersExample {
    public void connectWithCustomHeaders(HttpClient httpClient) throws Exception {
        WebSocket ws = httpClient.newWebSocketBuilder()
            // Authentication
            .header("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")

            // Custom application headers
            .header("X-Client-ID", "desktop-app-v2.1")
            .header("X-Session-Token", "session-123456")
            .header("X-API-Version", "2.0")

            // Custom metadata
            .header("X-User-Agent", "MyApp/1.0")
            .header("X-Device-Type", "desktop")

            .buildAsync(
                URI.create("wss://api.example.com/stream"),
                new AuthenticatedListener()
            )
            .join();
    }

    private static class AuthenticatedListener implements WebSocket.Listener {
        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("Authenticated connection established");
            // Server has validated our credentials from headers
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
            System.out.println("Authenticated message: " + data);
            ws.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onError(WebSocket ws, Throwable error) {
            System.err.println("Connection error: " + error.getMessage());
        }
    }
}

Connection Management

Proper connection lifecycle management includes opening, maintaining, and closing connections.

Connection Manager:

public class WebSocketConnectionManager {
    private WebSocket webSocket;
    private final URI uri;
    private final HttpClient httpClient;
    private volatile boolean isConnected = false;

    public WebSocketConnectionManager(URI uri, HttpClient httpClient) {
        this.uri = uri;
        this.httpClient = httpClient;
    }

    public void connect() throws Exception {
        if (isConnected) {
            throw new IllegalStateException("Already connected");
        }

        webSocket = httpClient.newWebSocketBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .buildAsync(uri, new ConnectionListener())
            .get(15, TimeUnit.SECONDS);

        isConnected = true;
    }

    public void send(String message) throws Exception {
        if (!isConnected) {
            throw new IllegalStateException("Not connected");
        }

        webSocket.sendText(message, true)
            .get(5, TimeUnit.SECONDS);
    }

    public void disconnect() throws Exception {
        if (isConnected) {
            webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "bye")
                .get(5, TimeUnit.SECONDS);
            isConnected = false;
        }
    }

    public boolean isConnected() {
        return isConnected;
    }

    private class ConnectionListener implements WebSocket.Listener {
        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("Connection established");
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
            System.out.println("Received: " + data);
            ws.request(1);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket ws, int statusCode, String reason) {
            isConnected = false;
            System.out.println("Connection closed: " + statusCode + " " + reason);
            return CompletableFuture.completedStage(null);
        }

        @Override
        public void onError(WebSocket ws, Throwable error) {
            isConnected = false;
            System.err.println("Connection error: " + error);
        }
    }
}

Summary

The WebSocket Client API provides:

  • Builder Configuration: Timeouts, subprotocols, headers, extensions
  • Listener Pattern: Lifecycle and message event callbacks
  • Bidirectional Communication: Send and receive text/binary messages
  • Backpressure Management: Request-based flow control
  • Protocol Features: Subprotocol and extension negotiation
  • Connection Lifecycle: Proper open/send/close sequencing

Mastering these API elements enables building robust, responsive WebSocket applications in Java.