16.4 Advanced Asynchronous Patterns
Master advanced patterns for production-grade async I/O applications.
AsynchronousChannelGroup
Channel Group Management:
import java.nio.channels.*;
import java.util.concurrent.*;
/**
* Managing channel groups for resource control
*/
public class ChannelGroupManagement {
/**
* Create custom channel group
*/
public static AsynchronousChannelGroup createChannelGroup(int threadPoolSize)
throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize,
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("AsyncIO-" + counter.incrementAndGet());
thread.setDaemon(false);
return thread;
}
});
return AsynchronousChannelGroup.withThreadPool(executor);
}
/**
* Use channel group for file operations
*/
public static void fileChannelWithGroup(Path path,
AsynchronousChannelGroup group)
throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
Set.of(StandardOpenOption.READ),
group);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0);
Integer bytesRead = future.get();
System.out.println("Read " + bytesRead + " bytes using group");
channel.close();
}
/**
* Use channel group for network operations
*/
public static void socketChannelWithGroup(AsynchronousChannelGroup group)
throws Exception {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
Future<Void> connectFuture = channel.connect(
new InetSocketAddress("example.com", 80));
connectFuture.get();
System.out.println("Connected using group");
channel.close();
}
/**
* Shutdown channel group
*/
public static void shutdownGroup(AsynchronousChannelGroup group)
throws Exception {
// Initiate shutdown
group.shutdown();
// Wait for termination
boolean terminated = group.awaitTermination(10, TimeUnit.SECONDS);
if (!terminated) {
System.err.println("Group did not terminate in time");
group.shutdownNow();
}
System.out.println("Channel group shut down");
}
/**
* Complete example with group lifecycle
*/
public static void completeGroupExample() throws Exception {
// Create group
AsynchronousChannelGroup group = createChannelGroup(4);
try {
// Use group for multiple operations
Path path1 = Path.of("file1.txt");
Path path2 = Path.of("file2.txt");
fileChannelWithGroup(path1, group);
fileChannelWithGroup(path2, group);
System.out.println("All operations completed");
} finally {
// Always shutdown
shutdownGroup(group);
}
}
}
CompletableFuture Integration
Bridging Async I/O with CompletableFuture:
/**
* Integrate async I/O with CompletableFuture
*/
public class CompletableFutureIntegration {
/**
* Convert Future to CompletableFuture
*/
public static <T> CompletableFuture<T> toCompletableFuture(Future<T> future) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (Exception e) {
throw new CompletionException(e);
}
});
}
/**
* Read file returning CompletableFuture
*/
public static CompletableFuture<String> readFileAsync(Path path) {
CompletableFuture<String> result = new CompletableFuture<>();
try {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate((int) Files.size(path));
channel.read(buffer, 0, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesRead, Void attachment) {
buffer.flip();
String content = StandardCharsets.UTF_8
.decode(buffer).toString();
result.complete(content);
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
@Override
public void failed(Throwable exc, Void attachment) {
result.completeExceptionally(exc);
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
});
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
/**
* Chain async file operations
*/
public static CompletableFuture<Void> chainedOperations(Path input,
Path output) {
return readFileAsync(input)
.thenApply(content -> content.toUpperCase())
.thenCompose(transformed -> writeFileAsync(output, transformed))
.thenRun(() -> System.out.println("Processing complete"));
}
/**
* Write file returning CompletableFuture
*/
public static CompletableFuture<Integer> writeFileAsync(Path path,
String content) {
CompletableFuture<Integer> result = new CompletableFuture<>();
try {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.wrap(
content.getBytes(StandardCharsets.UTF_8));
channel.write(buffer, 0, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesWritten, Void attachment) {
result.complete(bytesWritten);
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
@Override
public void failed(Throwable exc, Void attachment) {
result.completeExceptionally(exc);
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
});
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
/**
* Parallel file processing
*/
public static CompletableFuture<List<String>> processFilesParallel(
List<Path> files) {
List<CompletableFuture<String>> futures = files.stream()
.map(CompletableFutureIntegration::readFileAsync)
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
}
Async Pipeline Pattern
Building Data Processing Pipelines:
/**
* Async pipeline for data transformation
*/
public class AsyncPipeline<T> {
private final Queue<T> inputQueue;
private final Queue<T> outputQueue;
private final Function<T, CompletableFuture<T>> processor;
private final int concurrency;
private final AtomicBoolean running;
public AsyncPipeline(Function<T, CompletableFuture<T>> processor,
int concurrency) {
this.inputQueue = new ConcurrentLinkedQueue<>();
this.outputQueue = new ConcurrentLinkedQueue<>();
this.processor = processor;
this.concurrency = concurrency;
this.running = new AtomicBoolean(false);
}
/**
* Start pipeline
*/
public void start() {
running.set(true);
for (int i = 0; i < concurrency; i++) {
CompletableFuture.runAsync(this::processLoop);
}
}
private void processLoop() {
while (running.get()) {
T item = inputQueue.poll();
if (item != null) {
processor.apply(item)
.thenAccept(result -> outputQueue.offer(result))
.exceptionally(exc -> {
System.err.println("Processing failed: " +
exc.getMessage());
return null;
});
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* Add item to pipeline
*/
public void submit(T item) {
inputQueue.offer(item);
}
/**
* Get processed result
*/
public T poll() {
return outputQueue.poll();
}
/**
* Stop pipeline
*/
public void stop() {
running.set(false);
}
/**
* Example: File processing pipeline
*/
public static void fileProcessingExample() throws Exception {
AsyncPipeline<Path> pipeline = new AsyncPipeline<>(
path -> CompletableFutureIntegration.readFileAsync(path)
.thenApply(content -> {
// Process content
return path;
}),
4
);
pipeline.start();
// Submit files
List<Path> files = List.of(
Path.of("file1.txt"),
Path.of("file2.txt"),
Path.of("file3.txt")
);
files.forEach(pipeline::submit);
// Wait and collect results
Thread.sleep(2000);
Path result;
while ((result = pipeline.poll()) != null) {
System.out.println("Processed: " + result);
}
pipeline.stop();
}
}
Virtual Threads Comparison
Async I/O vs Virtual Threads:
/**
* Compare async I/O with virtual threads
*/
public class AsyncVsVirtualThreads {
/**
* Async I/O approach
*/
public static void asyncIOApproach(List<Path> files) throws Exception {
long start = System.nanoTime();
List<CompletableFuture<String>> futures = files.stream()
.map(path -> CompletableFutureIntegration.readFileAsync(path))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
long duration = System.nanoTime() - start;
System.out.println("Async I/O: " + (duration / 1_000_000) + "ms");
}
/**
* Virtual threads approach
*/
public static void virtualThreadsApproach(List<Path> files) throws Exception {
long start = System.nanoTime();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = files.stream()
.map(path -> executor.submit(() -> Files.readString(path)))
.toList();
for (Future<String> future : futures) {
future.get();
}
}
long duration = System.nanoTime() - start;
System.out.println("Virtual threads: " + (duration / 1_000_000) + "ms");
}
/**
* Benchmark comparison
*/
public static void benchmark() throws Exception {
// Create test files
List<Path> files = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Path file = Path.of("test" + i + ".txt");
Files.writeString(file, "Test data " + i);
files.add(file);
}
// Warm up
asyncIOApproach(files);
virtualThreadsApproach(files);
// Actual benchmark
System.out.println("\n=== Benchmark ===");
asyncIOApproach(files);
virtualThreadsApproach(files);
// Cleanup
for (Path file : files) {
Files.deleteIfExists(file);
}
}
/**
* When to use async I/O
*/
public static String whenToUseAsyncIO() {
return """
Use Async I/O when:
1. Building high-concurrency servers (1000+ connections)
2. Fine-grained control over thread pools needed
3. Working with existing async APIs
4. Memory efficiency critical (less overhead per operation)
5. Non-blocking behavior required at kernel level
""";
}
/**
* When to use virtual threads
*/
public static String whenToUseVirtualThreads() {
return """
Use Virtual Threads when:
1. Simpler programming model preferred
2. Existing blocking code to migrate
3. Moderate concurrency (100-1000 threads)
4. Thread-local storage needed
5. Debugging and profiling tools compatibility important
""";
}
}
Production Patterns
Async Request Handler:
/**
* Production-ready async request handler
*/
public class AsyncRequestHandler {
private final AsynchronousChannelGroup channelGroup;
private final ExecutorService businessLogic;
private final Semaphore concurrencyLimit;
public AsyncRequestHandler(int ioThreads, int businessThreads,
int maxConcurrent) throws IOException {
this.channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
ioThreads, Executors.defaultThreadFactory());
this.businessLogic = Executors.newFixedThreadPool(businessThreads);
this.concurrencyLimit = new Semaphore(maxConcurrent);
}
/**
* Handle client connection
*/
public void handleClient(AsynchronousSocketChannel client) {
try {
concurrencyLimit.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
closeQuietly(client);
return;
}
ByteBuffer buffer = ByteBuffer.allocate(8192);
client.read(buffer, buffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
if (bytesRead == -1) {
concurrencyLimit.release();
closeQuietly(client);
return;
}
attachment.flip();
// Process in business logic thread
businessLogic.submit(() -> {
try {
byte[] data = new byte[attachment.remaining()];
attachment.get(data);
// Process request
byte[] response = processRequest(data);
// Send response
ByteBuffer responseBuffer = ByteBuffer.wrap(response);
client.write(responseBuffer, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesSent,
Void att) {
concurrencyLimit.release();
closeQuietly(client);
}
@Override
public void failed(Throwable exc, Void att) {
concurrencyLimit.release();
closeQuietly(client);
}
});
} catch (Exception e) {
concurrencyLimit.release();
closeQuietly(client);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
concurrencyLimit.release();
closeQuietly(client);
}
});
}
private byte[] processRequest(byte[] request) {
// Business logic here
String requestStr = new String(request, StandardCharsets.UTF_8);
String response = "Processed: " + requestStr;
return response.getBytes(StandardCharsets.UTF_8);
}
private void closeQuietly(AsynchronousSocketChannel client) {
try {
if (client != null && client.isOpen()) {
client.close();
}
} catch (IOException e) {
// Ignore
}
}
/**
* Shutdown handler
*/
public void shutdown() throws Exception {
businessLogic.shutdown();
businessLogic.awaitTermination(10, TimeUnit.SECONDS);
channelGroup.shutdown();
channelGroup.awaitTermination(10, TimeUnit.SECONDS);
}
}
Async File Processor:
/**
* Batch file processor with async I/O
*/
public class AsyncFileProcessor {
private final AsynchronousChannelGroup group;
private final int parallelism;
public AsyncFileProcessor(int parallelism) throws IOException {
this.group = AsynchronousChannelGroup.withFixedThreadPool(
parallelism, Executors.defaultThreadFactory());
this.parallelism = parallelism;
}
/**
* Process multiple files in parallel
*/
public CompletableFuture<List<Result>> processFiles(List<Path> files) {
List<CompletableFuture<Result>> futures = files.stream()
.map(this::processFile)
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
private CompletableFuture<Result> processFile(Path file) {
CompletableFuture<Result> result = new CompletableFuture<>();
try {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(file,
Set.of(StandardOpenOption.READ),
group);
long fileSize = Files.size(file);
ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
long startTime = System.nanoTime();
channel.read(buffer, 0, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesRead, Void attachment) {
buffer.flip();
// Process data
int wordCount = countWords(buffer);
long duration = System.nanoTime() - startTime;
result.complete(new Result(
file.getFileName().toString(),
bytesRead,
wordCount,
duration / 1_000_000
));
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
@Override
public void failed(Throwable exc, Void attachment) {
result.completeExceptionally(exc);
try {
channel.close();
} catch (IOException e) {
// Ignore
}
}
});
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
private int countWords(ByteBuffer buffer) {
String content = StandardCharsets.UTF_8.decode(buffer).toString();
return content.split("\\s+").length;
}
public void close() throws Exception {
group.shutdown();
group.awaitTermination(10, TimeUnit.SECONDS);
}
public static class Result {
public final String filename;
public final int bytesRead;
public final int wordCount;
public final long durationMs;
Result(String filename, int bytesRead, int wordCount, long durationMs) {
this.filename = filename;
this.bytesRead = bytesRead;
this.wordCount = wordCount;
this.durationMs = durationMs;
}
@Override
public String toString() {
return String.format("%s: %d bytes, %d words, %dms",
filename, bytesRead, wordCount, durationMs);
}
}
/**
* Usage example
*/
public static void main(String[] args) throws Exception {
List<Path> files = List.of(
Path.of("file1.txt"),
Path.of("file2.txt"),
Path.of("file3.txt")
);
AsyncFileProcessor processor = new AsyncFileProcessor(4);
try {
List<Result> results = processor.processFiles(files).get();
results.forEach(System.out::println);
int totalBytes = results.stream()
.mapToInt(r -> r.bytesRead)
.sum();
int totalWords = results.stream()
.mapToInt(r -> r.wordCount)
.sum();
System.out.println("\nTotal: " + totalBytes + " bytes, " +
totalWords + " words");
} finally {
processor.close();
}
}
}
Best Practices
1. Use Channel Groups for Resource Control:
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(cpuCount, factory);
2. Integrate with CompletableFuture:
CompletableFuture<String> content = readFileAsync(path)
.thenApply(String::toUpperCase)
.thenCompose(data -> writeFileAsync(output, data));
3. Implement Backpressure:
Semaphore limit = new Semaphore(maxConcurrent);
limit.acquire();
performAsyncOperation().whenComplete((r, e) -> limit.release());
4. Consider Virtual Threads:
// For simpler blocking-style code with good concurrency
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> Files.readString(path));
}
5. Separate I/O and Business Logic:
// I/O threads for channel operations
AsynchronousChannelGroup ioGroup = ...;
// Business threads for processing
ExecutorService businessPool = ...;
These advanced patterns enable building robust, scalable async I/O applications with proper resource management and optimal performance characteristics.