16.1 Asynchronous I/O Fundamentals

Master the foundation of asynchronous I/O for building scalable, non-blocking applications.

Core Concepts

Synchronous vs Asynchronous I/O:

import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.concurrent.*;

/**
 * Compare synchronous and asynchronous approaches
 */
public class SyncVsAsync {

    /**
     * Synchronous file read - blocks calling thread
     */
    public static String readFileSync(Path path) throws IOException {
        return Files.readString(path);
    }

    /**
     * Asynchronous file read - returns immediately
     */
    public static Future<String> readFileAsync(Path path) throws IOException {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());

        // Returns immediately with Future
        Future<Integer> result = channel.read(buffer, 0);

        return new Future<>() {
            @Override
            public String get() throws InterruptedException, ExecutionException {
                result.get(); // Wait for completion
                buffer.flip();
                return StandardCharsets.UTF_8.decode(buffer).toString();
            }

            @Override
            public String get(long timeout, TimeUnit unit) 
                    throws InterruptedException, ExecutionException, TimeoutException {
                result.get(timeout, unit);
                buffer.flip();
                return StandardCharsets.UTF_8.decode(buffer).toString();
            }

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return result.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return result.isCancelled();
            }

            @Override
            public boolean isDone() {
                return result.isDone();
            }
        };
    }

    /**
     * Demonstrate differences
     */
    public static void main(String[] args) throws Exception {
        Path path = Path.of("data.txt");
        Files.writeString(path, "Hello, Async I/O!");

        // Synchronous - blocks
        long start = System.nanoTime();
        String syncResult = readFileSync(path);
        long syncTime = System.nanoTime() - start;
        System.out.println("Sync result: " + syncResult);
        System.out.println("Sync time: " + (syncTime / 1_000_000) + "ms");

        // Asynchronous - returns immediately
        start = System.nanoTime();
        Future<String> asyncFuture = readFileAsync(path);
        long submitTime = System.nanoTime() - start;
        System.out.println("Async submit time: " + (submitTime / 1_000_000) + "ms");

        // Do other work while I/O happens
        System.out.println("Doing other work...");

        // Get result when needed
        String asyncResult = asyncFuture.get();
        System.out.println("Async result: " + asyncResult);
    }
}

Asynchronous Channel Types:

/**
 * Overview of async channel types
 */
public class AsyncChannelTypes {

    /**
     * AsynchronousFileChannel - file I/O
     */
    public static void fileChannelExample() throws Exception {
        Path path = Path.of("example.txt");

        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        System.out.println("File channel opened: " + channel.isOpen());
        channel.close();
    }

    /**
     * AsynchronousSocketChannel - network client
     */
    public static void socketChannelExample() throws Exception {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();

        // Async connect
        Future<Void> connectFuture = channel.connect(
            new InetSocketAddress("example.com", 80));

        connectFuture.get(); // Wait for connection
        System.out.println("Connected: " + channel.isOpen());
        channel.close();
    }

    /**
     * AsynchronousServerSocketChannel - network server
     */
    public static void serverChannelExample() throws Exception {
        AsynchronousServerSocketChannel server = 
            AsynchronousServerSocketChannel.open();

        server.bind(new InetSocketAddress("localhost", 8080));

        System.out.println("Server listening on port 8080");

        // Async accept
        Future<AsynchronousSocketChannel> acceptFuture = server.accept();

        server.close();
    }
}

Future-Based Pattern

Basic Future Operations:

/**
 * Working with Future for async operations
 */
public class FutureBasedAsync {

    /**
     * Read file with Future
     */
    public static void readWithFuture(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // Submit operation
        Future<Integer> future = channel.read(buffer, 0);

        // Check status
        System.out.println("Is done? " + future.isDone());

        // Wait for completion
        Integer bytesRead = future.get();
        System.out.println("Bytes read: " + bytesRead);

        buffer.flip();
        System.out.println("Content: " + 
            StandardCharsets.UTF_8.decode(buffer).toString());

        channel.close();
    }

    /**
     * Read with timeout
     */
    public static void readWithTimeout(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Future<Integer> future = channel.read(buffer, 0);

        try {
            // Wait max 5 seconds
            Integer bytesRead = future.get(5, TimeUnit.SECONDS);
            System.out.println("Read completed: " + bytesRead + " bytes");
        } catch (TimeoutException e) {
            System.err.println("Operation timed out");
            future.cancel(true);
        } finally {
            channel.close();
        }
    }

    /**
     * Multiple concurrent reads
     */
    public static void concurrentReads(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        long fileSize = Files.size(path);
        int chunkSize = 1024;

        List<Future<Integer>> futures = new ArrayList<>();
        List<ByteBuffer> buffers = new ArrayList<>();

        // Start multiple reads
        for (long position = 0; position < fileSize; position += chunkSize) {
            ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
            buffers.add(buffer);

            Future<Integer> future = channel.read(buffer, position);
            futures.add(future);
        }

        // Wait for all completions
        int totalRead = 0;
        for (Future<Integer> future : futures) {
            totalRead += future.get();
        }

        System.out.println("Total bytes read: " + totalRead);
        channel.close();
    }

    /**
     * Cancel operation
     */
    public static void cancelOperation(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); // Large read
        Future<Integer> future = channel.read(buffer, 0);

        // Cancel after 100ms
        Thread.sleep(100);

        if (!future.isDone()) {
            boolean cancelled = future.cancel(true);
            System.out.println("Cancelled: " + cancelled);
        }

        channel.close();
    }
}

CompletionHandler Pattern

Basic Handler Implementation:

/**
 * CompletionHandler for callback-based async I/O
 */
public class HandlerBasedAsync {

    /**
     * Read with completion handler
     */
    public static void readWithHandler(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // Handler receives result
        CompletionHandler<Integer, ByteBuffer> handler = 
            new CompletionHandler<>() {

                @Override
                public void completed(Integer bytesRead, ByteBuffer attachment) {
                    System.out.println("Read completed: " + bytesRead + " bytes");

                    attachment.flip();
                    String content = StandardCharsets.UTF_8
                        .decode(attachment).toString();
                    System.out.println("Content: " + content);

                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.err.println("Read failed: " + exc.getMessage());

                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            };

        // Submit with handler
        channel.read(buffer, 0, buffer, handler);

        // Main thread continues immediately
        System.out.println("Read submitted, continuing...");

        // Keep program alive
        Thread.sleep(1000);
    }

    /**
     * Write with completion handler
     */
    public static void writeWithHandler(Path path, String content) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));

        CompletionHandler<Integer, String> handler = 
            new CompletionHandler<>() {

                @Override
                public void completed(Integer bytesWritten, String message) {
                    System.out.println("Write completed: " + bytesWritten + " bytes");
                    System.out.println("Message: " + message);

                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, String message) {
                    System.err.println("Write failed: " + exc.getMessage());

                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            };

        channel.write(buffer, 0, "Write operation", handler);

        Thread.sleep(1000);
    }

    /**
     * Chained operations with handlers
     */
    public static void chainedOperations(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // First read
        channel.read(buffer, 0, null, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, Object attachment) {
                System.out.println("First read: " + result + " bytes");

                buffer.flip();
                processData(buffer);
                buffer.clear();

                // Second read at different position
                channel.read(buffer, 1024, null, new CompletionHandler<>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("Second read: " + result + " bytes");

                        try {
                            channel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.err.println("Second read failed: " + exc.getMessage());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.err.println("First read failed: " + exc.getMessage());
            }
        });

        Thread.sleep(1000);
    }

    private static void processData(ByteBuffer buffer) {
        System.out.println("Processing " + buffer.remaining() + " bytes");
    }
}

Thread Pool Management

Custom Thread Pool:

/**
 * Managing thread pools for async I/O
 */
public class ThreadPoolManagement {

    /**
     * Use default thread pool
     */
    public static void defaultThreadPool(Path path) throws Exception {
        // Uses default system thread pool
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        System.out.println("Using default thread pool");

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, 0).get();

        channel.close();
    }

    /**
     * Use custom thread pool
     */
    public static void customThreadPool(Path path) throws Exception {
        // Create dedicated thread pool
        ExecutorService executor = Executors.newFixedThreadPool(4,
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("AsyncIO-" + counter.incrementAndGet());
                    thread.setDaemon(false);
                    return thread;
                }
            });

        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            Set.of(StandardOpenOption.READ),
            executor);

        System.out.println("Using custom thread pool");

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, 0).get();

        channel.close();
        executor.shutdown();
    }

    /**
     * Configure thread pool for workload
     */
    public static ExecutorService createOptimizedPool(int ioThreads) {
        return new ThreadPoolExecutor(
            ioThreads,                      // Core threads
            ioThreads * 2,                  // Max threads
            60L, TimeUnit.SECONDS,          // Keep alive
            new LinkedBlockingQueue<>(100), // Work queue
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("OptimizedIO-" + counter.incrementAndGet());
                    thread.setDaemon(false);
                    thread.setPriority(Thread.NORM_PRIORITY + 1);
                    return thread;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // Backpressure
        );
    }

    /**
     * Monitor thread pool usage
     */
    public static void monitorThreadPool() throws Exception {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) 
            Executors.newFixedThreadPool(4);

        Path path = Path.of("data.txt");

        for (int i = 0; i < 10; i++) {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
                Set.of(StandardOpenOption.READ),
                executor);

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            channel.read(buffer, 0);
        }

        System.out.println("Active threads: " + executor.getActiveCount());
        System.out.println("Pool size: " + executor.getPoolSize());
        System.out.println("Queue size: " + executor.getQueue().size());

        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

Error Handling

Robust Error Handling:

/**
 * Comprehensive error handling patterns
 */
public class AsyncErrorHandling {

    /**
     * Handle errors with Future
     */
    public static void futureErrorHandling(Path path) {
        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
                StandardOpenOption.READ);

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            Future<Integer> future = channel.read(buffer, 0);

            try {
                Integer bytesRead = future.get(5, TimeUnit.SECONDS);
                System.out.println("Success: " + bytesRead + " bytes");
            } catch (ExecutionException e) {
                // I/O error occurred
                System.err.println("I/O error: " + e.getCause().getMessage());
            } catch (TimeoutException e) {
                System.err.println("Operation timed out");
                future.cancel(true);
            } catch (InterruptedException e) {
                System.err.println("Operation interrupted");
                Thread.currentThread().interrupt();
            } finally {
                channel.close();
            }
        } catch (IOException e) {
            System.err.println("Failed to open channel: " + e.getMessage());
        }
    }

    /**
     * Handle errors with CompletionHandler
     */
    public static void handlerErrorHandling(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        CompletionHandler<Integer, Void> handler = new CompletionHandler<>() {
            @Override
            public void completed(Integer result, Void attachment) {
                System.out.println("Success: " + result + " bytes");
                closeQuietly(channel);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                if (exc instanceof AsynchronousCloseException) {
                    System.err.println("Channel was closed");
                } else if (exc instanceof ClosedChannelException) {
                    System.err.println("Channel is closed");
                } else if (exc instanceof IOException) {
                    System.err.println("I/O error: " + exc.getMessage());
                } else {
                    System.err.println("Unexpected error: " + exc.getMessage());
                }
                closeQuietly(channel);
            }
        };

        channel.read(buffer, 0, null, handler);

        Thread.sleep(1000);
    }

    private static void closeQuietly(AsynchronousFileChannel channel) {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
        } catch (IOException e) {
            // Ignore
        }
    }

    /**
     * Retry failed operations
     */
    public static class RetryHandler<V, A> implements CompletionHandler<V, A> {
        private final CompletionHandler<V, A> delegate;
        private final int maxRetries;
        private int retryCount = 0;
        private final Runnable retryOperation;

        public RetryHandler(CompletionHandler<V, A> delegate, 
                          int maxRetries,
                          Runnable retryOperation) {
            this.delegate = delegate;
            this.maxRetries = maxRetries;
            this.retryOperation = retryOperation;
        }

        @Override
        public void completed(V result, A attachment) {
            delegate.completed(result, attachment);
        }

        @Override
        public void failed(Throwable exc, A attachment) {
            if (retryCount < maxRetries && isRetryable(exc)) {
                retryCount++;
                System.out.println("Retry attempt " + retryCount);

                try {
                    Thread.sleep(100 * retryCount); // Exponential backoff
                    retryOperation.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    delegate.failed(exc, attachment);
                }
            } else {
                delegate.failed(exc, attachment);
            }
        }

        private boolean isRetryable(Throwable exc) {
            return exc instanceof IOException && 
                   !(exc instanceof ClosedChannelException);
        }
    }
}

Best Practices

1. Choose Appropriate Pattern:

// Future for simple operations
Future<Integer> future = channel.read(buffer, 0);
Integer result = future.get();

// CompletionHandler for high throughput
channel.read(buffer, 0, attachment, handler);

2. Manage Thread Pools:

// Use dedicated pool for I/O-heavy workloads
ExecutorService ioPool = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
    path, Set.of(StandardOpenOption.READ), ioPool);

3. Handle Errors Properly:

try {
    result = future.get(timeout, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
    // Handle specific errors
    future.cancel(true);
}

4. Close Resources:

try (AsynchronousFileChannel channel = ...) {
    // Automatic resource management
} // Closed automatically

5. Avoid Blocking in Handlers:

@Override
public void completed(Integer result, Object attachment) {
    // Process quickly or delegate to another thread
    executor.submit(() -> processResult(result));
}

These fundamentals provide the foundation for building scalable asynchronous I/O applications in Java.