16.3 Asynchronous Network I/O
Master network communication with AsynchronousSocketChannel and AsynchronousServerSocketChannel.
AsynchronousSocketChannel Basics
Client Connection:
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
/**
* Async socket client operations
*/
public class AsyncSocketClient {
/**
* Simple connection and data exchange
*/
public static void simpleClient(String host, int port) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// Connect asynchronously
Future<Void> connectFuture = client.connect(
new InetSocketAddress(host, port));
// Wait for connection
connectFuture.get();
System.out.println("Connected to " + host + ":" + port);
// Send data
String message = "Hello, Server!";
ByteBuffer writeBuffer = ByteBuffer.wrap(
message.getBytes(StandardCharsets.UTF_8));
Future<Integer> writeFuture = client.write(writeBuffer);
Integer bytesSent = writeFuture.get();
System.out.println("Sent " + bytesSent + " bytes");
// Receive response
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = client.read(readBuffer);
Integer bytesRead = readFuture.get();
readBuffer.flip();
String response = StandardCharsets.UTF_8.decode(readBuffer).toString();
System.out.println("Received: " + response);
client.close();
}
/**
* Connection with timeout
*/
public static void clientWithTimeout(String host, int port,
long timeoutSeconds) {
try {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> connectFuture = client.connect(
new InetSocketAddress(host, port));
try {
connectFuture.get(timeoutSeconds, TimeUnit.SECONDS);
System.out.println("Connected");
// Communication
client.close();
} catch (TimeoutException e) {
System.err.println("Connection timeout");
connectFuture.cancel(true);
client.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Client with completion handler
*/
public static void clientWithHandler(String host, int port)
throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
CountDownLatch latch = new CountDownLatch(1);
// Connect with handler
client.connect(new InetSocketAddress(host, port), null,
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
System.out.println("Connected successfully");
// Send message
ByteBuffer buffer = ByteBuffer.wrap(
"Hello!".getBytes(StandardCharsets.UTF_8));
client.write(buffer, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesSent,
Void attachment) {
System.out.println("Sent " + bytesSent + " bytes");
// Read response
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesRead,
Void attachment) {
readBuffer.flip();
String response = StandardCharsets.UTF_8
.decode(readBuffer).toString();
System.out.println("Response: " + response);
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
@Override
public void failed(Throwable exc,
Void attachment) {
System.err.println("Read failed: " +
exc.getMessage());
latch.countDown();
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Write failed: " +
exc.getMessage());
latch.countDown();
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Connection failed: " + exc.getMessage());
latch.countDown();
}
});
latch.await();
}
}
AsynchronousServerSocketChannel
Basic Server:
/**
* Async socket server operations
*/
public class AsyncSocketServer {
private AsynchronousServerSocketChannel server;
private boolean running;
/**
* Start server
*/
public void start(int port) throws IOException {
server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", port));
System.out.println("Server started on port " + port);
running = true;
acceptConnections();
}
/**
* Accept connections
*/
private void acceptConnections() {
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel client, Void attachment) {
// Accept next connection
if (running) {
acceptConnections();
}
// Handle this client
handleClient(client);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Accept failed: " + exc.getMessage());
if (running) {
acceptConnections();
}
}
});
}
/**
* Handle client connection
*/
private void handleClient(AsynchronousSocketChannel client) {
try {
System.out.println("Client connected: " +
client.getRemoteAddress());
} catch (IOException e) {
return;
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
// Read from client
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
if (bytesRead == -1) {
// Client disconnected
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
attachment.flip();
String message = StandardCharsets.UTF_8
.decode(attachment).toString();
System.out.println("Received: " + message);
// Echo back
String response = "Echo: " + message;
ByteBuffer writeBuffer = ByteBuffer.wrap(
response.getBytes(StandardCharsets.UTF_8));
client.write(writeBuffer, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesSent, Void attachment) {
System.out.println("Sent " + bytesSent + " bytes");
// Continue reading
attachment.clear();
client.read(attachment, attachment,
CompletionHandler.this);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Write failed: " +
exc.getMessage());
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("Read failed: " + exc.getMessage());
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
/**
* Stop server
*/
public void stop() throws IOException {
running = false;
if (server != null && server.isOpen()) {
server.close();
}
}
/**
* Run server
*/
public static void main(String[] args) throws Exception {
AsyncSocketServer server = new AsyncSocketServer();
server.start(8080);
// Keep running
Thread.sleep(60000);
server.stop();
}
}
Echo Server Implementation
Complete Echo Server:
/**
* Production-ready async echo server
*/
public class AsyncEchoServer {
private final AsynchronousServerSocketChannel server;
private final Set<AsynchronousSocketChannel> clients;
private final AtomicBoolean running;
public AsyncEchoServer(int port) throws IOException {
this.server = AsynchronousServerSocketChannel.open();
this.server.bind(new InetSocketAddress("localhost", port));
this.clients = ConcurrentHashMap.newKeySet();
this.running = new AtomicBoolean(false);
System.out.println("Echo server bound to port " + port);
}
/**
* Start accepting connections
*/
public void start() {
running.set(true);
acceptClient();
System.out.println("Echo server started");
}
private void acceptClient() {
if (!running.get()) {
return;
}
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel client, Void attachment) {
// Accept next client
acceptClient();
// Handle this client
clients.add(client);
try {
System.out.println("Client connected: " +
client.getRemoteAddress());
} catch (IOException e) {
return;
}
// Start reading
readFromClient(client);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Accept failed: " + exc.getMessage());
if (running.get()) {
acceptClient();
}
}
});
}
private void readFromClient(AsynchronousSocketChannel client) {
ByteBuffer buffer = ByteBuffer.allocate(8192);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
if (bytesRead == -1) {
disconnectClient(client);
return;
}
if (bytesRead > 0) {
attachment.flip();
// Echo back
ByteBuffer echoBuffer = ByteBuffer.allocate(attachment.remaining());
echoBuffer.put(attachment);
echoBuffer.flip();
writeToClient(client, echoBuffer);
}
// Continue reading
attachment.clear();
client.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("Read failed: " + exc.getMessage());
disconnectClient(client);
}
});
}
private void writeToClient(AsynchronousSocketChannel client, ByteBuffer buffer) {
client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesWritten, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
// Continue writing
client.write(attachment, attachment, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("Write failed: " + exc.getMessage());
disconnectClient(client);
}
});
}
private void disconnectClient(AsynchronousSocketChannel client) {
clients.remove(client);
try {
client.close();
System.out.println("Client disconnected");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Stop server
*/
public void stop() throws IOException {
running.set(false);
// Close all clients
for (AsynchronousSocketChannel client : clients) {
try {
client.close();
} catch (IOException e) {
// Ignore
}
}
clients.clear();
// Close server
server.close();
System.out.println("Echo server stopped");
}
/**
* Get active connections
*/
public int getActiveConnections() {
return clients.size();
}
}
HTTP-like Protocol Server
Simple Protocol Server:
/**
* Async server with custom protocol
*/
public class ProtocolServer {
/**
* Handle request/response protocol
*/
public static class RequestHandler {
public void handleRequest(AsynchronousSocketChannel client) {
ByteBuffer buffer = ByteBuffer.allocate(4096);
// Read request
client.read(buffer, buffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
if (bytesRead == -1) {
closeClient(client);
return;
}
attachment.flip();
String request = StandardCharsets.UTF_8
.decode(attachment).toString();
// Process request
String response = processRequest(request);
// Send response
ByteBuffer responseBuffer = ByteBuffer.wrap(
response.getBytes(StandardCharsets.UTF_8));
client.write(responseBuffer, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesSent,
Void attachment) {
System.out.println("Response sent: " +
bytesSent + " bytes");
closeClient(client);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Send failed: " +
exc.getMessage());
closeClient(client);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("Read failed: " + exc.getMessage());
closeClient(client);
}
});
}
private String processRequest(String request) {
// Parse and handle request
if (request.startsWith("GET")) {
return "200 OK\n\nHello, World!";
} else if (request.startsWith("POST")) {
return "201 Created\n\nResource created";
} else {
return "400 Bad Request\n\nInvalid request";
}
}
private void closeClient(AsynchronousSocketChannel client) {
try {
client.close();
} catch (IOException e) {
// Ignore
}
}
}
}
Connection Pool
Client Connection Pool:
/**
* Pool of async socket connections
*/
public class AsyncConnectionPool {
private final Queue<AsynchronousSocketChannel> availableConnections;
private final Set<AsynchronousSocketChannel> allConnections;
private final String host;
private final int port;
private final int maxConnections;
private final AtomicInteger connectionCount;
public AsyncConnectionPool(String host, int port, int maxConnections) {
this.host = host;
this.port = port;
this.maxConnections = maxConnections;
this.availableConnections = new ConcurrentLinkedQueue<>();
this.allConnections = ConcurrentHashMap.newKeySet();
this.connectionCount = new AtomicInteger(0);
}
/**
* Acquire connection from pool
*/
public CompletableFuture<AsynchronousSocketChannel> acquire() {
// Try to reuse existing connection
AsynchronousSocketChannel connection = availableConnections.poll();
if (connection != null && connection.isOpen()) {
return CompletableFuture.completedFuture(connection);
}
// Create new connection if under limit
if (connectionCount.get() < maxConnections) {
return createConnection();
}
// Wait for available connection
return CompletableFuture.supplyAsync(() -> {
while (true) {
AsynchronousSocketChannel conn = availableConnections.poll();
if (conn != null && conn.isOpen()) {
return conn;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
});
}
private CompletableFuture<AsynchronousSocketChannel> createConnection() {
CompletableFuture<AsynchronousSocketChannel> future = new CompletableFuture<>();
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
channel.connect(new InetSocketAddress(host, port), null,
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
allConnections.add(channel);
connectionCount.incrementAndGet();
future.complete(channel);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
/**
* Release connection back to pool
*/
public void release(AsynchronousSocketChannel connection) {
if (connection != null && connection.isOpen()) {
availableConnections.offer(connection);
}
}
/**
* Close all connections
*/
public void close() {
for (AsynchronousSocketChannel conn : allConnections) {
try {
conn.close();
} catch (IOException e) {
// Ignore
}
}
allConnections.clear();
availableConnections.clear();
connectionCount.set(0);
}
/**
* Get pool statistics
*/
public PoolStats getStats() {
return new PoolStats(
connectionCount.get(),
availableConnections.size(),
maxConnections
);
}
public static class PoolStats {
public final int totalConnections;
public final int availableConnections;
public final int maxConnections;
PoolStats(int total, int available, int max) {
this.totalConnections = total;
this.availableConnections = available;
this.maxConnections = max;
}
@Override
public String toString() {
return String.format("Pool[total=%d, available=%d, max=%d]",
totalConnections, availableConnections, maxConnections);
}
}
}
Backpressure Management
Flow Control:
/**
* Manage backpressure in async I/O
*/
public class BackpressureManager {
private final Semaphore permits;
private final AtomicInteger pendingOps;
public BackpressureManager(int maxConcurrentOps) {
this.permits = new Semaphore(maxConcurrentOps);
this.pendingOps = new AtomicInteger(0);
}
/**
* Submit operation with backpressure
*/
public <T> CompletableFuture<T> submit(Supplier<CompletableFuture<T>> operation) {
try {
permits.acquire();
pendingOps.incrementAndGet();
return operation.get()
.whenComplete((result, error) -> {
permits.release();
pendingOps.decrementAndGet();
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
/**
* Get pending operations count
*/
public int getPendingCount() {
return pendingOps.get();
}
/**
* Get available permits
*/
public int getAvailablePermits() {
return permits.availablePermits();
}
}
Best Practices
1. Use Handlers for High Throughput:
// Avoid blocking with Future.get()
client.read(buffer, null, completionHandler);
2. Implement Connection Pooling:
AsyncConnectionPool pool = new AsyncConnectionPool(host, port, 10);
AsynchronousSocketChannel conn = pool.acquire().get();
// Use connection
pool.release(conn);
3. Handle Partial Reads/Writes:
@Override
public void completed(Integer bytesRead, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
// Continue reading
client.read(buffer, buffer, this);
}
}
4. Manage Backpressure:
BackpressureManager manager = new BackpressureManager(100);
manager.submit(() -> performAsyncOperation());
5. Clean Up Resources:
try {
client.close();
} catch (IOException e) {
// Log but don't propagate
}
These async network patterns enable scalable, high-performance network applications handling thousands of concurrent connections.