18.4 Advanced WebSocket Patterns and Production Considerations
This section covers advanced WebSocket patterns, security mechanisms, performance optimization, and production deployment considerations for robust real-world applications.
Security and Authentication
WebSocket connections must be secured and authenticated just as any HTTP endpoint.
TLS/SSL Configuration:
public class SecureWebSocketClient {
public WebSocket connectSecure(String uri, String clientCertPath, String keyStorePassword)
throws Exception {
// Load client certificate
KeyStore keyStore = KeyStore.getInstance("JKS");
try (FileInputStream fis = new FileInputStream(clientCertPath)) {
keyStore.load(fis, keyStorePassword.toCharArray());
}
// Initialize KeyManagerFactory
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(keyStore, keyStorePassword.toCharArray());
// Create custom SSLContext
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
sslContext.init(kmf.getKeyManagers(), null, new SecureRandom());
// Create HttpClient with custom SSL context
HttpClient httpClient = HttpClient.newBuilder()
.sslContext(sslContext)
.version(HttpClient.Version.HTTP_2)
.build();
return httpClient.newWebSocketBuilder()
.buildAsync(
URI.create(uri),
new SecureWebSocketListener()
)
.join();
}
// Pin certificates for additional security
public WebSocket connectWithCertificatePinning(String uri, String pinnedCertPath)
throws Exception {
SSLContext sslContext = createPinnedSSLContext(pinnedCertPath);
HttpClient httpClient = HttpClient.newBuilder()
.sslContext(sslContext)
.build();
return httpClient.newWebSocketBuilder()
.buildAsync(URI.create(uri), new SecureWebSocketListener())
.join();
}
private SSLContext createPinnedSSLContext(String certPath) throws Exception {
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate cert;
try (FileInputStream fis = new FileInputStream(certPath)) {
cert = cf.generateCertificate(fis);
}
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null);
keyStore.setCertificateEntry("pinned", cert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
sslContext.init(null, tmf.getTrustManagers(), new SecureRandom());
return sslContext;
}
private static class SecureWebSocketListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Secure WebSocket connected");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Secure message: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Secure connection error: " + error);
}
}
}
Bearer Token Authentication:
public class BearerTokenAuthenticationWebSocket {
public WebSocket connectWithBearerToken(String uri, String token) throws Exception {
HttpClient httpClient = HttpClient.newHttpClient();
return httpClient.newWebSocketBuilder()
.header("Authorization", "Bearer " + token)
.header("X-Token-Expiry", String.valueOf(System.currentTimeMillis() + 3600000))
.buildAsync(
URI.create(uri),
new TokenAuthenticatedListener(token)
)
.join();
}
public WebSocket connectWithTokenRefresh(String uri, String initialToken,
TokenProvider tokenProvider) throws Exception {
HttpClient httpClient = HttpClient.newHttpClient();
return httpClient.newWebSocketBuilder()
.header("Authorization", "Bearer " + initialToken)
.buildAsync(
URI.create(uri),
new TokenRefreshingListener(tokenProvider)
)
.join();
}
public interface TokenProvider {
CompletableFuture<String> refreshToken(String expiredToken);
}
private static class TokenAuthenticatedListener implements WebSocket.Listener {
private final String token;
public TokenAuthenticatedListener(String token) {
this.token = token;
}
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Authenticated WebSocket connected with token");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
System.out.println("Authenticated message: " + data);
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Authentication error: " + error);
}
}
private static class TokenRefreshingListener implements WebSocket.Listener {
private final TokenProvider tokenProvider;
private volatile String currentToken;
public TokenRefreshingListener(TokenProvider tokenProvider) {
this.tokenProvider = tokenProvider;
}
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Token-refreshing WebSocket connected");
// Schedule token refresh
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(
this::refreshTokenIfNeeded,
55, // Refresh after 55 minutes (assuming 1 hour expiry)
55,
TimeUnit.MINUTES
);
webSocket.request(1);
}
private void refreshTokenIfNeeded() {
System.out.println("Refreshing authentication token...");
tokenProvider.refreshToken(currentToken)
.thenAccept(newToken -> {
currentToken = newToken;
System.out.println("Token refreshed successfully");
})
.exceptionally(e -> {
System.err.println("Token refresh failed: " + e);
return null;
});
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Connection error: " + error);
}
}
}
Reconnection and Resilience Patterns
Implementing robust reconnection logic handles temporary network failures gracefully.
Exponential Backoff Reconnection:
public class ReconnectionManager {
private final String uri;
private final HttpClient httpClient;
private final WebSocket.Listener listener;
private final long initialDelayMs;
private final long maxDelayMs;
private final int maxRetries;
private volatile WebSocket webSocket;
private volatile boolean shouldReconnect = true;
private int reconnectAttempt = 0;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public ReconnectionManager(String uri, HttpClient httpClient,
WebSocket.Listener listener,
long initialDelayMs, long maxDelayMs, int maxRetries) {
this.uri = uri;
this.httpClient = httpClient;
this.listener = listener;
this.initialDelayMs = initialDelayMs;
this.maxDelayMs = maxDelayMs;
this.maxRetries = maxRetries;
}
public void connect() {
reconnectWithBackoff();
}
private void reconnectWithBackoff() {
if (reconnectAttempt >= maxRetries) {
System.out.println("Max reconnection attempts reached");
shouldReconnect = false;
return;
}
// Calculate exponential backoff with jitter
long delayMs = calculateBackoffDelay(reconnectAttempt);
long jitterMs = (long) (Math.random() * delayMs * 0.1); // 10% jitter
long totalDelayMs = Math.min(delayMs + jitterMs, maxDelayMs);
System.out.println("Reconnecting in " + totalDelayMs + "ms (attempt " +
(reconnectAttempt + 1) + "/" + maxRetries + ")");
scheduler.schedule(
this::attemptConnection,
totalDelayMs,
TimeUnit.MILLISECONDS
);
}
private long calculateBackoffDelay(int attemptNumber) {
// Exponential: 2^n * initialDelay
return Math.min(
initialDelayMs * (long) Math.pow(2, attemptNumber),
maxDelayMs
);
}
private void attemptConnection() {
try {
webSocket = httpClient.newWebSocketBuilder()
.connectTimeout(java.time.Duration.ofSeconds(10))
.buildAsync(
URI.create(uri),
new ResilienceAwareListener(this, listener)
)
.get(15, TimeUnit.SECONDS);
// Connection successful, reset attempt counter
reconnectAttempt = 0;
System.out.println("WebSocket reconnected successfully");
} catch (Exception e) {
System.err.println("Reconnection attempt " + (reconnectAttempt + 1) + " failed: " + e);
reconnectAttempt++;
if (shouldReconnect) {
reconnectWithBackoff();
}
}
}
public void disconnect() {
shouldReconnect = false;
if (webSocket != null) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "Closing");
}
scheduler.shutdown();
}
private static class ResilienceAwareListener implements WebSocket.Listener {
private final ReconnectionManager manager;
private final WebSocket.Listener delegateListener;
public ResilienceAwareListener(ReconnectionManager manager, WebSocket.Listener listener) {
this.manager = manager;
this.delegateListener = listener;
}
@Override
public void onOpen(WebSocket webSocket) {
delegateListener.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
return delegateListener.onText(ws, data, last);
}
@Override
public CompletionStage<?> onClose(WebSocket ws, int statusCode, String reason) {
System.out.println("WebSocket closed: " + statusCode + " " + reason);
if (manager.shouldReconnect && statusCode != WebSocket.NORMAL_CLOSURE) {
manager.reconnectWithBackoff();
}
return delegateListener.onClose(ws, statusCode, reason);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("WebSocket error: " + error.getMessage());
delegateListener.onError(ws, error);
if (manager.shouldReconnect) {
manager.reconnectWithBackoff();
}
}
}
}
Circuit Breaker Pattern:
public class CircuitBreakerWebSocket {
public enum State {
CLOSED, // Normal operation
OPEN, // Failing, reject new attempts
HALF_OPEN // Testing if service recovered
}
private volatile State state = State.CLOSED;
private volatile long lastFailureTime;
private volatile int failureCount = 0;
private final int failureThreshold;
private final long openDurationMs;
private final int resetTimeoutMs;
public CircuitBreakerWebSocket(int failureThreshold, long openDurationMs, int resetTimeoutMs) {
this.failureThreshold = failureThreshold;
this.openDurationMs = openDurationMs;
this.resetTimeoutMs = resetTimeoutMs;
}
public CompletableFuture<WebSocket> connectWithCircuitBreaker(
HttpClient httpClient, String uri, WebSocket.Listener listener) {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > openDurationMs) {
transitionToHalfOpen();
} else {
return CompletableFuture.failedFuture(
new RuntimeException("Circuit breaker is OPEN")
);
}
}
return httpClient.newWebSocketBuilder()
.connectTimeout(java.time.Duration.ofSeconds(10))
.buildAsync(URI.create(uri), new CircuitBreakerListener(this, listener))
.exceptionally(e -> {
recordFailure();
throw new CompletionException(e);
});
}
private void recordFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
if (failureCount >= failureThreshold) {
transitionToOpen();
}
}
private void recordSuccess() {
if (state == State.HALF_OPEN) {
transitionToClosed();
}
}
private void transitionToOpen() {
state = State.OPEN;
System.out.println("Circuit breaker OPEN - rejecting requests");
}
private void transitionToHalfOpen() {
state = State.HALF_OPEN;
failureCount = 0;
System.out.println("Circuit breaker HALF_OPEN - testing recovery");
}
private void transitionToClosed() {
state = State.CLOSED;
failureCount = 0;
System.out.println("Circuit breaker CLOSED - normal operation");
}
public State getState() {
return state;
}
private static class CircuitBreakerListener implements WebSocket.Listener {
private final CircuitBreakerWebSocket circuitBreaker;
private final WebSocket.Listener delegateListener;
public CircuitBreakerListener(CircuitBreakerWebSocket cb, WebSocket.Listener listener) {
this.circuitBreaker = cb;
this.delegateListener = listener;
}
@Override
public void onOpen(WebSocket webSocket) {
circuitBreaker.recordSuccess();
delegateListener.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
return delegateListener.onText(ws, data, last);
}
@Override
public CompletionStage<?> onClose(WebSocket ws, int statusCode, String reason) {
if (statusCode != WebSocket.NORMAL_CLOSURE) {
circuitBreaker.recordFailure();
}
return delegateListener.onClose(ws, statusCode, reason);
}
@Override
public void onError(WebSocket ws, Throwable error) {
circuitBreaker.recordFailure();
delegateListener.onError(ws, error);
}
}
}
Compression and Performance Optimization
Compression reduces bandwidth usage for high-volume messaging.
Permessage-Deflate Compression:
public class CompressionOptimization {
public WebSocket connectWithCompression(String uri) throws Exception {
HttpClient httpClient = HttpClient.newHttpClient();
return httpClient.newWebSocketBuilder()
// Request compression extension
.header("Sec-WebSocket-Extensions",
"permessage-deflate; client_max_window_bits")
.buildAsync(
URI.create(uri),
new CompressionAwareListener()
)
.join();
}
private static class CompressionAwareListener implements WebSocket.Listener {
private long totalBytesReceived = 0;
private long totalBytesAfterCompression = 0;
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
// Data is automatically decompressed by implementation
String message = data.toString();
int originalSize = message.getBytes(StandardCharsets.UTF_8).length;
// Actual compressed size would be reported by server
totalBytesReceived += originalSize;
System.out.println("Message received: " + originalSize + " bytes (compressed)");
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("WebSocket opened with compression enabled");
webSocket.request(1);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
error.printStackTrace();
}
public void printCompressionStats() {
System.out.println("Total bytes received: " + totalBytesReceived);
System.out.println("Compression ratio: " +
(100 - (totalBytesAfterCompression * 100 / Math.max(totalBytesReceived, 1))) + "%");
}
}
}
Connection Pooling and Reuse
Reusing HttpClient instances maintains connection pools efficiently.
Connection Pool Management:
public class ConnectionPoolManager {
private final HttpClient httpClient;
private final Map<String, WebSocketPool> pools = new ConcurrentHashMap<>();
public ConnectionPoolManager() {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(java.time.Duration.ofSeconds(10))
.version(HttpClient.Version.HTTP_2)
.build();
}
public WebSocket getOrCreateConnection(String uri) throws Exception {
WebSocketPool pool = pools.computeIfAbsent(uri, u -> new WebSocketPool(uri, httpClient));
return pool.getConnection();
}
public void releaseConnection(String uri, WebSocket ws) {
WebSocketPool pool = pools.get(uri);
if (pool != null) {
pool.releaseConnection(ws);
}
}
public void shutdown() {
pools.values().forEach(WebSocketPool::shutdown);
pools.clear();
}
public static class WebSocketPool {
private final String uri;
private final HttpClient httpClient;
private final BlockingQueue<WebSocket> availableConnections;
private final Set<WebSocket> allConnections = ConcurrentHashMap.newKeySet();
private final int poolSize = 5;
public WebSocketPool(String uri, HttpClient httpClient) {
this.uri = uri;
this.httpClient = httpClient;
this.availableConnections = new LinkedBlockingQueue<>(poolSize);
initializePool();
}
private void initializePool() {
for (int i = 0; i < poolSize; i++) {
try {
WebSocket ws = createConnection();
availableConnections.offer(ws);
allConnections.add(ws);
} catch (Exception e) {
System.err.println("Failed to create pool connection: " + e);
}
}
}
private WebSocket createConnection() throws Exception {
return httpClient.newWebSocketBuilder()
.buildAsync(URI.create(uri), new PooledWebSocketListener())
.get(15, TimeUnit.SECONDS);
}
public WebSocket getConnection() throws Exception {
WebSocket ws = availableConnections.poll();
if (ws == null) {
ws = createConnection();
allConnections.add(ws);
}
return ws;
}
public void releaseConnection(WebSocket ws) {
if (!availableConnections.offer(ws)) {
// Pool full, close connection
ws.sendClose(WebSocket.NORMAL_CLOSURE, "Pool full");
}
}
public void shutdown() {
allConnections.forEach(ws -> {
try {
ws.sendClose(WebSocket.NORMAL_CLOSURE, "Shutdown");
} catch (Exception e) {
System.err.println("Error closing pooled connection: " + e);
}
});
allConnections.clear();
availableConnections.clear();
}
private static class PooledWebSocketListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("Pooled connection opened");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket ws, CharSequence data, boolean last) {
ws.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket ws, Throwable error) {
System.err.println("Pooled connection error: " + error);
}
}
}
}
Monitoring and Observability
Production deployments require comprehensive monitoring.
Metrics Collection:
public class WebSocketMetrics {
private final AtomicLong messagesReceived = new AtomicLong();
private final AtomicLong messagesSent = new AtomicLong();
private final AtomicLong bytesReceived = new AtomicLong();
private final AtomicLong bytesSent = new AtomicLong();
private final AtomicLong connectionsOpened = new AtomicLong();
private final AtomicLong connectionsClosed = new AtomicLong();
private final AtomicLong errors = new AtomicLong();
private volatile long connectionStartTime;
public WebSocket.Listener metricsListener() {
return new MetricsCollectingListener(this);
}
public void recordMessageReceived(int bytes) {
messagesReceived.incrementAndGet();
bytesReceived.addAndGet(bytes);
}
public void recordMessageSent(int bytes) {
messagesSent.incrementAndGet();
bytesSent.addAndGet(bytes);
}
public void recordConnectionOpened() {
connectionsOpened.incrementAndGet();
connectionStartTime = System.currentTimeMillis();
}
public void recordConnectionClosed() {
connectionsClosed.incrementAndGet();
}
public void recordError() {
errors.incrementAndGet();
}
public void printMetrics() {
long uptime = System.currentTimeMillis() - connectionStartTime;
double messagesPerSecond = messagesReceived.get() / (uptime / 1000.0);
double bytesPerSecond = bytesReceived.get() / (uptime / 1000.0);
System.out.println("=== WebSocket Metrics ===");
System.out.println("Messages received: " + messagesReceived.get());
System.out.println("Messages sent: " + messagesSent.get());
System.out.println("Bytes received: " + bytesReceived.get());
System.out.println("Bytes sent: " + bytesSent.get());
System.out.println("Connections opened: " + connectionsOpened.get());
System.out.println("Connections closed: " + connectionsClosed.get());
System.out.println("Errors: " + errors.get());
System.out.println("Messages/sec: " + String.format("%.2f", messagesPerSecond));
System.out.println("Bytes/sec: " + String.format("%.2f", bytesPerSecond));
}
private static class MetricsCollectingListener implements WebSocket.Listener {
private final WebSocketMetrics metrics;
public MetricsCollectingListener(WebSocketMetrics metrics) {
this.metrics = metrics;
}
@Override
public void onOpen(WebSocket webSocket) {
metrics.recordConnectionOpened();
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
metrics.recordMessageReceived(data.length());
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
metrics.recordMessageReceived(data.remaining());
webSocket.request(1);
return CompletableFuture.completedStage(null);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
metrics.recordConnectionClosed();
return CompletableFuture.completedStage(null);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
metrics.recordError();
System.err.println("WebSocket error: " + error);
}
}
}
Summary
Production WebSocket deployments require:
- Security: TLS/SSL, certificate pinning, bearer token authentication
- Resilience: Exponential backoff reconnection, circuit breaker pattern
- Performance: Compression, connection pooling, efficient serialization
- Observability: Comprehensive metrics, error tracking, monitoring
- Reliability: Proper error handling, graceful degradation, timeout management
These advanced patterns enable building scalable, production-grade real-time applications with WebSocket.