18.2 WebSocket Client API
The Java WebSocket client API provides a builder-based approach to creating WebSocket connections, configuring client parameters, and handling bidirectional communication. This section covers the WebSocketBuilder configuration, connection establishment, and send/receive operations.
WebSocketBuilder Configuration
The HttpClient provides a WebSocketBuilder accessible via newWebSocketBuilder() for configuring WebSocket connections.
Basic Connection:
public class BasicWebSocketConnection {
public static void main(String[] args) throws Exception {
HttpClient client = HttpClient.newHttpClient();
CompletableFuture<WebSocket> wsFuture = client.newWebSocketBuilder()
.buildAsync(
URI.create("wss://echo.websocket.org"),
new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Connected");
webSocket.request(1); // Request first message
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Received: " + data);
ws.request(1); // Request next message
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
error.printStackTrace();
}
}
);
WebSocket ws = wsFuture.join();
ws.sendText("Hello, WebSocket!", true);
Thread.sleep(2000);
ws.sendClose(WebSocket.NORMAL_CLOSURE, "bye");
}
}
Builder Configuration Options:
public class WebSocketBuilderConfiguration {
public void configureBuilder(HttpClient httpClient) throws Exception {
WebSocket ws = httpClient.newWebSocketBuilder()
// Connection timeout (upgrade handshake)
.connectTimeout(Duration.ofSeconds(10))
// Subprotocols: Advertise supported application protocols
.subprotocols("chat", "superchat")
// Headers: Custom headers for upgrade request
.header("X-Custom-Header", "value")
.header("Authorization", "Bearer token123")
// Build and connect asynchronously
.buildAsync(
URI.create("wss://example.com/chat"),
new SimpleWebSocketListener()
)
.join(); // Wait for connection
ws.sendText("Connected!", true);
}
private static class SimpleWebSocketListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("WebSocket opened");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Text: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Error: " + error);
}
}
}
WebSocket.Listener Interface
The Listener interface defines callbacks for WebSocket lifecycle and message events.
Listener Methods:
public interface WebSocket.Listener {
// Invoked when WebSocket upgrade completes
void onOpen(WebSocket webSocket);
// Text message received
CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast);
// Binary message received
CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean isLast);
// Ping received (must respond with pong)
CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message);
// Pong received (ping response)
CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message);
// Close frame received
CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason);
// Connection error
void onError(WebSocket webSocket, Throwable error);
}
Complete Listener Implementation:
public class FullFeaturedListener implements WebSocket.Listener {
private static final Logger logger = LoggerFactory.getLogger(FullFeaturedListener.class);
private StringBuilder textMessageBuilder = new StringBuilder();
private List<ByteBuffer> binaryMessageFrames = new ArrayList<>();
@Override
public void onOpen(WebSocket webSocket) {
logger.info("WebSocket connection established");
webSocket.request(1); // Request first message
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast) {
logger.debug("Text frame received: {} chars, isLast: {}", data.length(), isLast);
textMessageBuilder.append(data);
if (isLast) {
String completeMessage = textMessageBuilder.toString();
logger.info("Complete text message: {}", completeMessage);
textMessageBuilder = new StringBuilder();
return handleTextMessage(webSocket, completeMessage);
}
webSocket.request(1); // Request next frame
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean isLast) {
logger.debug("Binary frame received: {} bytes, isLast: {}", data.remaining(), isLast);
binaryMessageFrames.add(data);
if (isLast) {
ByteBuffer completeMessage = assembleBinaryMessage();
logger.info("Complete binary message: {} bytes", completeMessage.remaining());
return handleBinaryMessage(webSocket, completeMessage);
}
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
logger.debug("Ping received: {} bytes", message.remaining());
// Automatically respond with pong
return webSocket.sendPong(message).thenRun(() -> {
logger.debug("Pong sent");
webSocket.request(1);
});
}
@Override
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
logger.debug("Pong received: {} bytes", message.remaining());
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
logger.info("Close received: {} - {}", statusCode, reason);
// Auto-respond to close (already done by implementation)
// Perform cleanup if needed
return cleanup();
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
logger.error("WebSocket error", error);
}
private CompletionStage<?> handleTextMessage(WebSocket ws, String message) {
// Process message asynchronously
return CompletableFuture.runAsync(() -> {
// Message processing logic
try {
Thread.sleep(100); // Simulate processing
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).thenRun(() -> ws.request(1));
}
private CompletionStage<?> handleBinaryMessage(WebSocket ws, ByteBuffer data) {
return CompletableFuture.runAsync(() -> {
// Binary message processing logic
}).thenRun(() -> ws.request(1));
}
private ByteBuffer assembleBinaryMessage() {
int totalSize = binaryMessageFrames.stream()
.mapToInt(ByteBuffer::remaining)
.sum();
ByteBuffer combined = ByteBuffer.allocate(totalSize);
binaryMessageFrames.forEach(combined::put);
combined.flip();
binaryMessageFrames.clear();
return combined;
}
private CompletionStage<?> cleanup() {
return CompletableFuture.completedStage(null);
}
}
Sending Messages
The WebSocket interface provides methods for sending different message types.
Send Methods:
public interface WebSocket {
// Send text message
CompletionStage<?> sendText(CharSequence data, boolean isLast);
// Send binary message
CompletionStage<?> sendBinary(ByteBuffer data, boolean isLast);
// Send ping (client-initiated)
CompletionStage<?> sendPing(ByteBuffer message);
// Send pong (ping response)
CompletionStage<?> sendPong(ByteBuffer message);
// Send close frame
CompletionStage<?> sendClose(int statusCode, String reason);
// Control backpressure: Request n messages
void request(long n);
// Get subprotocol agreed with server
String getSubprotocol();
// Get whether connection is open
boolean isOutputClosed();
}
Practical Send Examples:
public class MessageSending {
private WebSocket webSocket;
// Send single text message
public void sendTextMessage(String message) {
webSocket.sendText(message, true)
.thenRun(() -> System.out.println("Text sent"))
.exceptionally(e -> {
System.err.println("Send failed: " + e);
return null;
});
}
// Send large text message in chunks
public void sendLargeTextMessage(String message) throws Exception {
int chunkSize = 1024;
String[] chunks = message.split("(?<=\\G.{" + chunkSize + "})");
CompletableFuture<?> chain = CompletableFuture.completedStage(null);
for (int i = 0; i < chunks.length; i++) {
boolean isLast = (i == chunks.length - 1);
String chunk = chunks[i];
chain = chain.thenCompose(v ->
webSocket.sendText(chunk, isLast)
);
}
chain.join();
}
// Send binary message
public void sendBinaryMessage(byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
webSocket.sendBinary(buffer, true)
.thenRun(() -> System.out.println("Binary sent"))
.exceptionally(e -> {
System.err.println("Binary send failed: " + e);
return null;
});
}
// Send multiple text frames
public void sendMultipleFrames(List<String> frames) {
CompletableFuture<?> chain = CompletableFuture.completedStage(null);
for (int i = 0; i < frames.size(); i++) {
boolean isLast = (i == frames.size() - 1);
String frame = frames.get(i);
chain = chain.thenCompose(v ->
webSocket.sendText(frame, isLast)
);
}
chain.join();
}
// Send and wait for response
public String sendRequestResponse(String request) throws Exception {
final String[] response = new String[1];
final CompletableFuture<String> responseFuture = new CompletableFuture<>();
// Send message
webSocket.sendText(request, true)
.thenRun(() -> {
// Response will be handled by listener
})
.join();
// Wait for response (with timeout)
return responseFuture.get(5, TimeUnit.SECONDS);
}
}
Backpressure Management
WebSocket communication uses a request-based backpressure model. Clients must call request(n) to indicate readiness for messages.
Backpressure Pattern:
public class BackpressureManager implements WebSocket.Listener {
private static final long REQUEST_BATCH_SIZE = 10;
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Requesting initial batch");
webSocket.request(REQUEST_BATCH_SIZE);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean isLast) {
// Process message with controlled concurrency
return CompletableFuture.runAsync(() -> {
processMessage(data.toString());
}).thenRun(() -> {
// Request next message after processing
webSocket.request(1);
});
}
private void processMessage(String data) {
// Message processing logic
System.out.println("Processing: " + data);
}
}
Subprotocol Negotiation
Subprotocols allow clients and servers to agree on application-level protocols.
Subprotocol Example:
public class SubprotocolNegotiation {
public void connectWithSubprotocol(HttpClient httpClient) throws Exception {
WebSocket ws = httpClient.newWebSocketBuilder()
// Advertise multiple subprotocols in preference order
.subprotocols("chat", "superchat", "json-rpc")
.buildAsync(
URI.create("wss://example.com/protocol"),
new SubprotocolListener()
)
.join();
}
private static class SubprotocolListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
// Get negotiated subprotocol
String subprotocol = webSocket.getSubprotocol();
System.out.println("Negotiated subprotocol: " + subprotocol);
if ("chat".equals(subprotocol)) {
webSocket.sendText("JOIN_CHAT", true);
} else if ("json-rpc".equals(subprotocol)) {
webSocket.sendText("{\"jsonrpc\":\"2.0\",\"method\":\"connect\"}", true);
}
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Message: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
error.printStackTrace();
}
}
}
Extension Negotiation
The WebSocket protocol supports extensions (like permessage-deflate for compression).
Extension Handling:
public class ExtensionNegotiation {
public void connectWithExtensions(HttpClient httpClient) throws Exception {
WebSocket ws = httpClient.newWebSocketBuilder()
// Request compression extension
.header("Sec-WebSocket-Extensions", "permessage-deflate")
.buildAsync(
URI.create("wss://example.com/compressed"),
new ExtensionAwareListener()
)
.join();
}
private static class ExtensionAwareListener implements WebSocket.Listener {
private boolean compressionEnabled = false;
@Override
public void onOpen(WebSocket webSocket) {
// Extensions would be negotiated during handshake
// Check server response for extension confirmation
System.out.println("Connection opened");
System.out.println("Compression enabled: " + compressionEnabled);
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
// Received data is automatically decompressed by implementation
System.out.println("Decompressed message: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
error.printStackTrace();
}
}
}
Custom Headers
Custom headers can be added to the WebSocket upgrade request for authentication or other purposes.
Custom Header Usage:
public class CustomHeadersExample {
public void connectWithCustomHeaders(HttpClient httpClient) throws Exception {
WebSocket ws = httpClient.newWebSocketBuilder()
// Authentication
.header("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
// Custom application headers
.header("X-Client-ID", "desktop-app-v2.1")
.header("X-Session-Token", "session-123456")
.header("X-API-Version", "2.0")
// Custom metadata
.header("X-User-Agent", "MyApp/1.0")
.header("X-Device-Type", "desktop")
.buildAsync(
URI.create("wss://api.example.com/stream"),
new AuthenticatedListener()
)
.join();
}
private static class AuthenticatedListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Authenticated connection established");
// Server has validated our credentials from headers
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Authenticated message: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Connection error: " + error.getMessage());
}
}
}
Connection Management
Proper connection lifecycle management includes opening, maintaining, and closing connections.
Connection Manager:
public class WebSocketConnectionManager {
private WebSocket webSocket;
private final URI uri;
private final HttpClient httpClient;
private volatile boolean isConnected = false;
public WebSocketConnectionManager(URI uri, HttpClient httpClient) {
this.uri = uri;
this.httpClient = httpClient;
}
public void connect() throws Exception {
if (isConnected) {
throw new IllegalStateException("Already connected");
}
webSocket = httpClient.newWebSocketBuilder()
.connectTimeout(Duration.ofSeconds(10))
.buildAsync(uri, new ConnectionListener())
.get(15, TimeUnit.SECONDS);
isConnected = true;
}
public void send(String message) throws Exception {
if (!isConnected) {
throw new IllegalStateException("Not connected");
}
webSocket.sendText(message, true)
.get(5, TimeUnit.SECONDS);
}
public void disconnect() throws Exception {
if (isConnected) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "bye")
.get(5, TimeUnit.SECONDS);
isConnected = false;
}
}
public boolean isConnected() {
return isConnected;
}
private class ConnectionListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Connection established");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Received: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onClose(WebSocket ws, int statusCode, String reason) {
isConnected = false;
System.out.println("Connection closed: " + statusCode + " " + reason);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
isConnected = false;
System.err.println("Connection error: " + error);
}
}
}
Summary
The WebSocket Client API provides:
- Builder Configuration: Timeouts, subprotocols, headers, extensions
- Listener Pattern: Lifecycle and message event callbacks
- Bidirectional Communication: Send and receive text/binary messages
- Backpressure Management: Request-based flow control
- Protocol Features: Subprotocol and extension negotiation
- Connection Lifecycle: Proper open/send/close sequencing
Mastering these API elements enables building robust, responsive WebSocket applications in Java.