16.3 Asynchronous Network I/O

Master network communication with AsynchronousSocketChannel and AsynchronousServerSocketChannel.

AsynchronousSocketChannel Basics

Client Connection:

import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.*;

/**
 * Async socket client operations
 */
public class AsyncSocketClient {

    /**
     * Simple connection and data exchange
     */
    public static void simpleClient(String host, int port) throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

        // Connect asynchronously
        Future<Void> connectFuture = client.connect(
            new InetSocketAddress(host, port));

        // Wait for connection
        connectFuture.get();
        System.out.println("Connected to " + host + ":" + port);

        // Send data
        String message = "Hello, Server!";
        ByteBuffer writeBuffer = ByteBuffer.wrap(
            message.getBytes(StandardCharsets.UTF_8));

        Future<Integer> writeFuture = client.write(writeBuffer);
        Integer bytesSent = writeFuture.get();
        System.out.println("Sent " + bytesSent + " bytes");

        // Receive response
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        Future<Integer> readFuture = client.read(readBuffer);
        Integer bytesRead = readFuture.get();

        readBuffer.flip();
        String response = StandardCharsets.UTF_8.decode(readBuffer).toString();
        System.out.println("Received: " + response);

        client.close();
    }

    /**
     * Connection with timeout
     */
    public static void clientWithTimeout(String host, int port, 
                                        long timeoutSeconds) {
        try {
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

            Future<Void> connectFuture = client.connect(
                new InetSocketAddress(host, port));

            try {
                connectFuture.get(timeoutSeconds, TimeUnit.SECONDS);
                System.out.println("Connected");

                // Communication

                client.close();
            } catch (TimeoutException e) {
                System.err.println("Connection timeout");
                connectFuture.cancel(true);
                client.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Client with completion handler
     */
    public static void clientWithHandler(String host, int port) 
            throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

        CountDownLatch latch = new CountDownLatch(1);

        // Connect with handler
        client.connect(new InetSocketAddress(host, port), null,
            new CompletionHandler<Void, Void>() {
                @Override
                public void completed(Void result, Void attachment) {
                    System.out.println("Connected successfully");

                    // Send message
                    ByteBuffer buffer = ByteBuffer.wrap(
                        "Hello!".getBytes(StandardCharsets.UTF_8));

                    client.write(buffer, null, 
                        new CompletionHandler<Integer, Void>() {
                            @Override
                            public void completed(Integer bytesSent, 
                                                Void attachment) {
                                System.out.println("Sent " + bytesSent + " bytes");

                                // Read response
                                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                                client.read(readBuffer, null,
                                    new CompletionHandler<Integer, Void>() {
                                        @Override
                                        public void completed(Integer bytesRead,
                                                            Void attachment) {
                                            readBuffer.flip();
                                            String response = StandardCharsets.UTF_8
                                                .decode(readBuffer).toString();
                                            System.out.println("Response: " + response);

                                            try {
                                                client.close();
                                            } catch (IOException e) {
                                                e.printStackTrace();
                                            }

                                            latch.countDown();
                                        }

                                        @Override
                                        public void failed(Throwable exc, 
                                                         Void attachment) {
                                            System.err.println("Read failed: " + 
                                                exc.getMessage());
                                            latch.countDown();
                                        }
                                    });
                            }

                            @Override
                            public void failed(Throwable exc, Void attachment) {
                                System.err.println("Write failed: " + 
                                    exc.getMessage());
                                latch.countDown();
                            }
                        });
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.err.println("Connection failed: " + exc.getMessage());
                    latch.countDown();
                }
            });

        latch.await();
    }
}

AsynchronousServerSocketChannel

Basic Server:

/**
 * Async socket server operations
 */
public class AsyncSocketServer {
    private AsynchronousServerSocketChannel server;
    private boolean running;

    /**
     * Start server
     */
    public void start(int port) throws IOException {
        server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress("localhost", port));

        System.out.println("Server started on port " + port);

        running = true;
        acceptConnections();
    }

    /**
     * Accept connections
     */
    private void acceptConnections() {
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Void attachment) {
                // Accept next connection
                if (running) {
                    acceptConnections();
                }

                // Handle this client
                handleClient(client);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Accept failed: " + exc.getMessage());

                if (running) {
                    acceptConnections();
                }
            }
        });
    }

    /**
     * Handle client connection
     */
    private void handleClient(AsynchronousSocketChannel client) {
        try {
            System.out.println("Client connected: " + 
                client.getRemoteAddress());
        } catch (IOException e) {
            return;
        }

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // Read from client
        client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer bytesRead, ByteBuffer attachment) {
                if (bytesRead == -1) {
                    // Client disconnected
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return;
                }

                attachment.flip();
                String message = StandardCharsets.UTF_8
                    .decode(attachment).toString();
                System.out.println("Received: " + message);

                // Echo back
                String response = "Echo: " + message;
                ByteBuffer writeBuffer = ByteBuffer.wrap(
                    response.getBytes(StandardCharsets.UTF_8));

                client.write(writeBuffer, null, 
                    new CompletionHandler<Integer, Void>() {
                        @Override
                        public void completed(Integer bytesSent, Void attachment) {
                            System.out.println("Sent " + bytesSent + " bytes");

                            // Continue reading
                            attachment.clear();
                            client.read(attachment, attachment,
                                CompletionHandler.this);
                        }

                        @Override
                        public void failed(Throwable exc, Void attachment) {
                            System.err.println("Write failed: " + 
                                exc.getMessage());
                        }
                    });
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("Read failed: " + exc.getMessage());

                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * Stop server
     */
    public void stop() throws IOException {
        running = false;
        if (server != null && server.isOpen()) {
            server.close();
        }
    }

    /**
     * Run server
     */
    public static void main(String[] args) throws Exception {
        AsyncSocketServer server = new AsyncSocketServer();
        server.start(8080);

        // Keep running
        Thread.sleep(60000);

        server.stop();
    }
}

Echo Server Implementation

Complete Echo Server:

/**
 * Production-ready async echo server
 */
public class AsyncEchoServer {
    private final AsynchronousServerSocketChannel server;
    private final Set<AsynchronousSocketChannel> clients;
    private final AtomicBoolean running;

    public AsyncEchoServer(int port) throws IOException {
        this.server = AsynchronousServerSocketChannel.open();
        this.server.bind(new InetSocketAddress("localhost", port));
        this.clients = ConcurrentHashMap.newKeySet();
        this.running = new AtomicBoolean(false);

        System.out.println("Echo server bound to port " + port);
    }

    /**
     * Start accepting connections
     */
    public void start() {
        running.set(true);
        acceptClient();
        System.out.println("Echo server started");
    }

    private void acceptClient() {
        if (!running.get()) {
            return;
        }

        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Void attachment) {
                // Accept next client
                acceptClient();

                // Handle this client
                clients.add(client);

                try {
                    System.out.println("Client connected: " + 
                        client.getRemoteAddress());
                } catch (IOException e) {
                    return;
                }

                // Start reading
                readFromClient(client);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Accept failed: " + exc.getMessage());

                if (running.get()) {
                    acceptClient();
                }
            }
        });
    }

    private void readFromClient(AsynchronousSocketChannel client) {
        ByteBuffer buffer = ByteBuffer.allocate(8192);

        client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer bytesRead, ByteBuffer attachment) {
                if (bytesRead == -1) {
                    disconnectClient(client);
                    return;
                }

                if (bytesRead > 0) {
                    attachment.flip();

                    // Echo back
                    ByteBuffer echoBuffer = ByteBuffer.allocate(attachment.remaining());
                    echoBuffer.put(attachment);
                    echoBuffer.flip();

                    writeToClient(client, echoBuffer);
                }

                // Continue reading
                attachment.clear();
                client.read(attachment, attachment, this);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("Read failed: " + exc.getMessage());
                disconnectClient(client);
            }
        });
    }

    private void writeToClient(AsynchronousSocketChannel client, ByteBuffer buffer) {
        client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer bytesWritten, ByteBuffer attachment) {
                if (attachment.hasRemaining()) {
                    // Continue writing
                    client.write(attachment, attachment, this);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("Write failed: " + exc.getMessage());
                disconnectClient(client);
            }
        });
    }

    private void disconnectClient(AsynchronousSocketChannel client) {
        clients.remove(client);

        try {
            client.close();
            System.out.println("Client disconnected");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * Stop server
     */
    public void stop() throws IOException {
        running.set(false);

        // Close all clients
        for (AsynchronousSocketChannel client : clients) {
            try {
                client.close();
            } catch (IOException e) {
                // Ignore
            }
        }
        clients.clear();

        // Close server
        server.close();
        System.out.println("Echo server stopped");
    }

    /**
     * Get active connections
     */
    public int getActiveConnections() {
        return clients.size();
    }
}

HTTP-like Protocol Server

Simple Protocol Server:

/**
 * Async server with custom protocol
 */
public class ProtocolServer {

    /**
     * Handle request/response protocol
     */
    public static class RequestHandler {

        public void handleRequest(AsynchronousSocketChannel client) {
            ByteBuffer buffer = ByteBuffer.allocate(4096);

            // Read request
            client.read(buffer, buffer, 
                new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer bytesRead, ByteBuffer attachment) {
                        if (bytesRead == -1) {
                            closeClient(client);
                            return;
                        }

                        attachment.flip();
                        String request = StandardCharsets.UTF_8
                            .decode(attachment).toString();

                        // Process request
                        String response = processRequest(request);

                        // Send response
                        ByteBuffer responseBuffer = ByteBuffer.wrap(
                            response.getBytes(StandardCharsets.UTF_8));

                        client.write(responseBuffer, null,
                            new CompletionHandler<Integer, Void>() {
                                @Override
                                public void completed(Integer bytesSent, 
                                                    Void attachment) {
                                    System.out.println("Response sent: " + 
                                        bytesSent + " bytes");
                                    closeClient(client);
                                }

                                @Override
                                public void failed(Throwable exc, Void attachment) {
                                    System.err.println("Send failed: " + 
                                        exc.getMessage());
                                    closeClient(client);
                                }
                            });
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        System.err.println("Read failed: " + exc.getMessage());
                        closeClient(client);
                    }
                });
        }

        private String processRequest(String request) {
            // Parse and handle request
            if (request.startsWith("GET")) {
                return "200 OK\n\nHello, World!";
            } else if (request.startsWith("POST")) {
                return "201 Created\n\nResource created";
            } else {
                return "400 Bad Request\n\nInvalid request";
            }
        }

        private void closeClient(AsynchronousSocketChannel client) {
            try {
                client.close();
            } catch (IOException e) {
                // Ignore
            }
        }
    }
}

Connection Pool

Client Connection Pool:

/**
 * Pool of async socket connections
 */
public class AsyncConnectionPool {
    private final Queue<AsynchronousSocketChannel> availableConnections;
    private final Set<AsynchronousSocketChannel> allConnections;
    private final String host;
    private final int port;
    private final int maxConnections;
    private final AtomicInteger connectionCount;

    public AsyncConnectionPool(String host, int port, int maxConnections) {
        this.host = host;
        this.port = port;
        this.maxConnections = maxConnections;
        this.availableConnections = new ConcurrentLinkedQueue<>();
        this.allConnections = ConcurrentHashMap.newKeySet();
        this.connectionCount = new AtomicInteger(0);
    }

    /**
     * Acquire connection from pool
     */
    public CompletableFuture<AsynchronousSocketChannel> acquire() {
        // Try to reuse existing connection
        AsynchronousSocketChannel connection = availableConnections.poll();

        if (connection != null && connection.isOpen()) {
            return CompletableFuture.completedFuture(connection);
        }

        // Create new connection if under limit
        if (connectionCount.get() < maxConnections) {
            return createConnection();
        }

        // Wait for available connection
        return CompletableFuture.supplyAsync(() -> {
            while (true) {
                AsynchronousSocketChannel conn = availableConnections.poll();
                if (conn != null && conn.isOpen()) {
                    return conn;
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        });
    }

    private CompletableFuture<AsynchronousSocketChannel> createConnection() {
        CompletableFuture<AsynchronousSocketChannel> future = new CompletableFuture<>();

        try {
            AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();

            channel.connect(new InetSocketAddress(host, port), null,
                new CompletionHandler<Void, Void>() {
                    @Override
                    public void completed(Void result, Void attachment) {
                        allConnections.add(channel);
                        connectionCount.incrementAndGet();
                        future.complete(channel);
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        future.completeExceptionally(exc);
                    }
                });
        } catch (IOException e) {
            future.completeExceptionally(e);
        }

        return future;
    }

    /**
     * Release connection back to pool
     */
    public void release(AsynchronousSocketChannel connection) {
        if (connection != null && connection.isOpen()) {
            availableConnections.offer(connection);
        }
    }

    /**
     * Close all connections
     */
    public void close() {
        for (AsynchronousSocketChannel conn : allConnections) {
            try {
                conn.close();
            } catch (IOException e) {
                // Ignore
            }
        }
        allConnections.clear();
        availableConnections.clear();
        connectionCount.set(0);
    }

    /**
     * Get pool statistics
     */
    public PoolStats getStats() {
        return new PoolStats(
            connectionCount.get(),
            availableConnections.size(),
            maxConnections
        );
    }

    public static class PoolStats {
        public final int totalConnections;
        public final int availableConnections;
        public final int maxConnections;

        PoolStats(int total, int available, int max) {
            this.totalConnections = total;
            this.availableConnections = available;
            this.maxConnections = max;
        }

        @Override
        public String toString() {
            return String.format("Pool[total=%d, available=%d, max=%d]",
                totalConnections, availableConnections, maxConnections);
        }
    }
}

Backpressure Management

Flow Control:

/**
 * Manage backpressure in async I/O
 */
public class BackpressureManager {
    private final Semaphore permits;
    private final AtomicInteger pendingOps;

    public BackpressureManager(int maxConcurrentOps) {
        this.permits = new Semaphore(maxConcurrentOps);
        this.pendingOps = new AtomicInteger(0);
    }

    /**
     * Submit operation with backpressure
     */
    public <T> CompletableFuture<T> submit(Supplier<CompletableFuture<T>> operation) {
        try {
            permits.acquire();
            pendingOps.incrementAndGet();

            return operation.get()
                .whenComplete((result, error) -> {
                    permits.release();
                    pendingOps.decrementAndGet();
                });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }
    }

    /**
     * Get pending operations count
     */
    public int getPendingCount() {
        return pendingOps.get();
    }

    /**
     * Get available permits
     */
    public int getAvailablePermits() {
        return permits.availablePermits();
    }
}

Best Practices

1. Use Handlers for High Throughput:

// Avoid blocking with Future.get()
client.read(buffer, null, completionHandler);

2. Implement Connection Pooling:

AsyncConnectionPool pool = new AsyncConnectionPool(host, port, 10);
AsynchronousSocketChannel conn = pool.acquire().get();
// Use connection
pool.release(conn);

3. Handle Partial Reads/Writes:

@Override
public void completed(Integer bytesRead, ByteBuffer buffer) {
    if (buffer.hasRemaining()) {
        // Continue reading
        client.read(buffer, buffer, this);
    }
}

4. Manage Backpressure:

BackpressureManager manager = new BackpressureManager(100);
manager.submit(() -> performAsyncOperation());

5. Clean Up Resources:

try {
    client.close();
} catch (IOException e) {
    // Log but don't propagate
}

These async network patterns enable scalable, high-performance network applications handling thousands of concurrent connections.