16.4 Advanced Asynchronous Patterns

Master advanced patterns for production-grade async I/O applications.

AsynchronousChannelGroup

Channel Group Management:

import java.nio.channels.*;
import java.util.concurrent.*;

/**
 * Managing channel groups for resource control
 */
public class ChannelGroupManagement {

    /**
     * Create custom channel group
     */
    public static AsynchronousChannelGroup createChannelGroup(int threadPoolSize) 
            throws IOException {
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize,
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("AsyncIO-" + counter.incrementAndGet());
                    thread.setDaemon(false);
                    return thread;
                }
            });

        return AsynchronousChannelGroup.withThreadPool(executor);
    }

    /**
     * Use channel group for file operations
     */
    public static void fileChannelWithGroup(Path path, 
                                           AsynchronousChannelGroup group) 
            throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            Set.of(StandardOpenOption.READ),
            group);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        Future<Integer> future = channel.read(buffer, 0);
        Integer bytesRead = future.get();

        System.out.println("Read " + bytesRead + " bytes using group");

        channel.close();
    }

    /**
     * Use channel group for network operations
     */
    public static void socketChannelWithGroup(AsynchronousChannelGroup group) 
            throws Exception {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);

        Future<Void> connectFuture = channel.connect(
            new InetSocketAddress("example.com", 80));

        connectFuture.get();
        System.out.println("Connected using group");

        channel.close();
    }

    /**
     * Shutdown channel group
     */
    public static void shutdownGroup(AsynchronousChannelGroup group) 
            throws Exception {
        // Initiate shutdown
        group.shutdown();

        // Wait for termination
        boolean terminated = group.awaitTermination(10, TimeUnit.SECONDS);

        if (!terminated) {
            System.err.println("Group did not terminate in time");
            group.shutdownNow();
        }

        System.out.println("Channel group shut down");
    }

    /**
     * Complete example with group lifecycle
     */
    public static void completeGroupExample() throws Exception {
        // Create group
        AsynchronousChannelGroup group = createChannelGroup(4);

        try {
            // Use group for multiple operations
            Path path1 = Path.of("file1.txt");
            Path path2 = Path.of("file2.txt");

            fileChannelWithGroup(path1, group);
            fileChannelWithGroup(path2, group);

            System.out.println("All operations completed");
        } finally {
            // Always shutdown
            shutdownGroup(group);
        }
    }
}

CompletableFuture Integration

Bridging Async I/O with CompletableFuture:

/**
 * Integrate async I/O with CompletableFuture
 */
public class CompletableFutureIntegration {

    /**
     * Convert Future to CompletableFuture
     */
    public static <T> CompletableFuture<T> toCompletableFuture(Future<T> future) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return future.get();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        });
    }

    /**
     * Read file returning CompletableFuture
     */
    public static CompletableFuture<String> readFileAsync(Path path) {
        CompletableFuture<String> result = new CompletableFuture<>();

        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
                StandardOpenOption.READ);

            ByteBuffer buffer = ByteBuffer.allocate((int) Files.size(path));

            channel.read(buffer, 0, null, 
                new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer bytesRead, Void attachment) {
                        buffer.flip();
                        String content = StandardCharsets.UTF_8
                            .decode(buffer).toString();
                        result.complete(content);

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        result.completeExceptionally(exc);

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                });
        } catch (IOException e) {
            result.completeExceptionally(e);
        }

        return result;
    }

    /**
     * Chain async file operations
     */
    public static CompletableFuture<Void> chainedOperations(Path input, 
                                                            Path output) {
        return readFileAsync(input)
            .thenApply(content -> content.toUpperCase())
            .thenCompose(transformed -> writeFileAsync(output, transformed))
            .thenRun(() -> System.out.println("Processing complete"));
    }

    /**
     * Write file returning CompletableFuture
     */
    public static CompletableFuture<Integer> writeFileAsync(Path path, 
                                                            String content) {
        CompletableFuture<Integer> result = new CompletableFuture<>();

        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
                StandardOpenOption.WRITE,
                StandardOpenOption.CREATE);

            ByteBuffer buffer = ByteBuffer.wrap(
                content.getBytes(StandardCharsets.UTF_8));

            channel.write(buffer, 0, null, 
                new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer bytesWritten, Void attachment) {
                        result.complete(bytesWritten);

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        result.completeExceptionally(exc);

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                });
        } catch (IOException e) {
            result.completeExceptionally(e);
        }

        return result;
    }

    /**
     * Parallel file processing
     */
    public static CompletableFuture<List<String>> processFilesParallel(
            List<Path> files) {
        List<CompletableFuture<String>> futures = files.stream()
            .map(CompletableFutureIntegration::readFileAsync)
            .toList();

        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }
}

Async Pipeline Pattern

Building Data Processing Pipelines:

/**
 * Async pipeline for data transformation
 */
public class AsyncPipeline<T> {
    private final Queue<T> inputQueue;
    private final Queue<T> outputQueue;
    private final Function<T, CompletableFuture<T>> processor;
    private final int concurrency;
    private final AtomicBoolean running;

    public AsyncPipeline(Function<T, CompletableFuture<T>> processor, 
                        int concurrency) {
        this.inputQueue = new ConcurrentLinkedQueue<>();
        this.outputQueue = new ConcurrentLinkedQueue<>();
        this.processor = processor;
        this.concurrency = concurrency;
        this.running = new AtomicBoolean(false);
    }

    /**
     * Start pipeline
     */
    public void start() {
        running.set(true);

        for (int i = 0; i < concurrency; i++) {
            CompletableFuture.runAsync(this::processLoop);
        }
    }

    private void processLoop() {
        while (running.get()) {
            T item = inputQueue.poll();

            if (item != null) {
                processor.apply(item)
                    .thenAccept(result -> outputQueue.offer(result))
                    .exceptionally(exc -> {
                        System.err.println("Processing failed: " + 
                            exc.getMessage());
                        return null;
                    });
            } else {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    /**
     * Add item to pipeline
     */
    public void submit(T item) {
        inputQueue.offer(item);
    }

    /**
     * Get processed result
     */
    public T poll() {
        return outputQueue.poll();
    }

    /**
     * Stop pipeline
     */
    public void stop() {
        running.set(false);
    }

    /**
     * Example: File processing pipeline
     */
    public static void fileProcessingExample() throws Exception {
        AsyncPipeline<Path> pipeline = new AsyncPipeline<>(
            path -> CompletableFutureIntegration.readFileAsync(path)
                .thenApply(content -> {
                    // Process content
                    return path;
                }),
            4
        );

        pipeline.start();

        // Submit files
        List<Path> files = List.of(
            Path.of("file1.txt"),
            Path.of("file2.txt"),
            Path.of("file3.txt")
        );

        files.forEach(pipeline::submit);

        // Wait and collect results
        Thread.sleep(2000);

        Path result;
        while ((result = pipeline.poll()) != null) {
            System.out.println("Processed: " + result);
        }

        pipeline.stop();
    }
}

Virtual Threads Comparison

Async I/O vs Virtual Threads:

/**
 * Compare async I/O with virtual threads
 */
public class AsyncVsVirtualThreads {

    /**
     * Async I/O approach
     */
    public static void asyncIOApproach(List<Path> files) throws Exception {
        long start = System.nanoTime();

        List<CompletableFuture<String>> futures = files.stream()
            .map(path -> CompletableFutureIntegration.readFileAsync(path))
            .toList();

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();

        long duration = System.nanoTime() - start;
        System.out.println("Async I/O: " + (duration / 1_000_000) + "ms");
    }

    /**
     * Virtual threads approach
     */
    public static void virtualThreadsApproach(List<Path> files) throws Exception {
        long start = System.nanoTime();

        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<String>> futures = files.stream()
                .map(path -> executor.submit(() -> Files.readString(path)))
                .toList();

            for (Future<String> future : futures) {
                future.get();
            }
        }

        long duration = System.nanoTime() - start;
        System.out.println("Virtual threads: " + (duration / 1_000_000) + "ms");
    }

    /**
     * Benchmark comparison
     */
    public static void benchmark() throws Exception {
        // Create test files
        List<Path> files = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Path file = Path.of("test" + i + ".txt");
            Files.writeString(file, "Test data " + i);
            files.add(file);
        }

        // Warm up
        asyncIOApproach(files);
        virtualThreadsApproach(files);

        // Actual benchmark
        System.out.println("\n=== Benchmark ===");
        asyncIOApproach(files);
        virtualThreadsApproach(files);

        // Cleanup
        for (Path file : files) {
            Files.deleteIfExists(file);
        }
    }

    /**
     * When to use async I/O
     */
    public static String whenToUseAsyncIO() {
        return """
            Use Async I/O when:
            1. Building high-concurrency servers (1000+ connections)
            2. Fine-grained control over thread pools needed
            3. Working with existing async APIs
            4. Memory efficiency critical (less overhead per operation)
            5. Non-blocking behavior required at kernel level
            """;
    }

    /**
     * When to use virtual threads
     */
    public static String whenToUseVirtualThreads() {
        return """
            Use Virtual Threads when:
            1. Simpler programming model preferred
            2. Existing blocking code to migrate
            3. Moderate concurrency (100-1000 threads)
            4. Thread-local storage needed
            5. Debugging and profiling tools compatibility important
            """;
    }
}

Production Patterns

Async Request Handler:

/**
 * Production-ready async request handler
 */
public class AsyncRequestHandler {
    private final AsynchronousChannelGroup channelGroup;
    private final ExecutorService businessLogic;
    private final Semaphore concurrencyLimit;

    public AsyncRequestHandler(int ioThreads, int businessThreads, 
                              int maxConcurrent) throws IOException {
        this.channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
            ioThreads, Executors.defaultThreadFactory());

        this.businessLogic = Executors.newFixedThreadPool(businessThreads);
        this.concurrencyLimit = new Semaphore(maxConcurrent);
    }

    /**
     * Handle client connection
     */
    public void handleClient(AsynchronousSocketChannel client) {
        try {
            concurrencyLimit.acquire();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            closeQuietly(client);
            return;
        }

        ByteBuffer buffer = ByteBuffer.allocate(8192);

        client.read(buffer, buffer, 
            new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer bytesRead, ByteBuffer attachment) {
                    if (bytesRead == -1) {
                        concurrencyLimit.release();
                        closeQuietly(client);
                        return;
                    }

                    attachment.flip();

                    // Process in business logic thread
                    businessLogic.submit(() -> {
                        try {
                            byte[] data = new byte[attachment.remaining()];
                            attachment.get(data);

                            // Process request
                            byte[] response = processRequest(data);

                            // Send response
                            ByteBuffer responseBuffer = ByteBuffer.wrap(response);
                            client.write(responseBuffer, null,
                                new CompletionHandler<Integer, Void>() {
                                    @Override
                                    public void completed(Integer bytesSent, 
                                                        Void att) {
                                        concurrencyLimit.release();
                                        closeQuietly(client);
                                    }

                                    @Override
                                    public void failed(Throwable exc, Void att) {
                                        concurrencyLimit.release();
                                        closeQuietly(client);
                                    }
                                });
                        } catch (Exception e) {
                            concurrencyLimit.release();
                            closeQuietly(client);
                        }
                    });
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    concurrencyLimit.release();
                    closeQuietly(client);
                }
            });
    }

    private byte[] processRequest(byte[] request) {
        // Business logic here
        String requestStr = new String(request, StandardCharsets.UTF_8);
        String response = "Processed: " + requestStr;
        return response.getBytes(StandardCharsets.UTF_8);
    }

    private void closeQuietly(AsynchronousSocketChannel client) {
        try {
            if (client != null && client.isOpen()) {
                client.close();
            }
        } catch (IOException e) {
            // Ignore
        }
    }

    /**
     * Shutdown handler
     */
    public void shutdown() throws Exception {
        businessLogic.shutdown();
        businessLogic.awaitTermination(10, TimeUnit.SECONDS);

        channelGroup.shutdown();
        channelGroup.awaitTermination(10, TimeUnit.SECONDS);
    }
}

Async File Processor:

/**
 * Batch file processor with async I/O
 */
public class AsyncFileProcessor {
    private final AsynchronousChannelGroup group;
    private final int parallelism;

    public AsyncFileProcessor(int parallelism) throws IOException {
        this.group = AsynchronousChannelGroup.withFixedThreadPool(
            parallelism, Executors.defaultThreadFactory());
        this.parallelism = parallelism;
    }

    /**
     * Process multiple files in parallel
     */
    public CompletableFuture<List<Result>> processFiles(List<Path> files) {
        List<CompletableFuture<Result>> futures = files.stream()
            .map(this::processFile)
            .toList();

        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }

    private CompletableFuture<Result> processFile(Path file) {
        CompletableFuture<Result> result = new CompletableFuture<>();

        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(file,
                Set.of(StandardOpenOption.READ),
                group);

            long fileSize = Files.size(file);
            ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);

            long startTime = System.nanoTime();

            channel.read(buffer, 0, null, 
                new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer bytesRead, Void attachment) {
                        buffer.flip();

                        // Process data
                        int wordCount = countWords(buffer);

                        long duration = System.nanoTime() - startTime;

                        result.complete(new Result(
                            file.getFileName().toString(),
                            bytesRead,
                            wordCount,
                            duration / 1_000_000
                        ));

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        result.completeExceptionally(exc);

                        try {
                            channel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                });
        } catch (IOException e) {
            result.completeExceptionally(e);
        }

        return result;
    }

    private int countWords(ByteBuffer buffer) {
        String content = StandardCharsets.UTF_8.decode(buffer).toString();
        return content.split("\\s+").length;
    }

    public void close() throws Exception {
        group.shutdown();
        group.awaitTermination(10, TimeUnit.SECONDS);
    }

    public static class Result {
        public final String filename;
        public final int bytesRead;
        public final int wordCount;
        public final long durationMs;

        Result(String filename, int bytesRead, int wordCount, long durationMs) {
            this.filename = filename;
            this.bytesRead = bytesRead;
            this.wordCount = wordCount;
            this.durationMs = durationMs;
        }

        @Override
        public String toString() {
            return String.format("%s: %d bytes, %d words, %dms",
                filename, bytesRead, wordCount, durationMs);
        }
    }

    /**
     * Usage example
     */
    public static void main(String[] args) throws Exception {
        List<Path> files = List.of(
            Path.of("file1.txt"),
            Path.of("file2.txt"),
            Path.of("file3.txt")
        );

        AsyncFileProcessor processor = new AsyncFileProcessor(4);

        try {
            List<Result> results = processor.processFiles(files).get();

            results.forEach(System.out::println);

            int totalBytes = results.stream()
                .mapToInt(r -> r.bytesRead)
                .sum();

            int totalWords = results.stream()
                .mapToInt(r -> r.wordCount)
                .sum();

            System.out.println("\nTotal: " + totalBytes + " bytes, " + 
                totalWords + " words");
        } finally {
            processor.close();
        }
    }
}

Best Practices

1. Use Channel Groups for Resource Control:

AsynchronousChannelGroup group = 
    AsynchronousChannelGroup.withFixedThreadPool(cpuCount, factory);

2. Integrate with CompletableFuture:

CompletableFuture<String> content = readFileAsync(path)
    .thenApply(String::toUpperCase)
    .thenCompose(data -> writeFileAsync(output, data));

3. Implement Backpressure:

Semaphore limit = new Semaphore(maxConcurrent);
limit.acquire();
performAsyncOperation().whenComplete((r, e) -> limit.release());

4. Consider Virtual Threads:

// For simpler blocking-style code with good concurrency
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> Files.readString(path));
}

5. Separate I/O and Business Logic:

// I/O threads for channel operations
AsynchronousChannelGroup ioGroup = ...;

// Business threads for processing
ExecutorService businessPool = ...;

These advanced patterns enable building robust, scalable async I/O applications with proper resource management and optimal performance characteristics.