16.1 Asynchronous I/O Fundamentals
Master the foundation of asynchronous I/O for building scalable, non-blocking applications.
Core Concepts
Synchronous vs Asynchronous I/O:
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.concurrent.*;
/**
* Compare synchronous and asynchronous approaches
*/
public class SyncVsAsync {
/**
* Synchronous file read - blocks calling thread
*/
public static String readFileSync(Path path) throws IOException {
return Files.readString(path);
}
/**
* Asynchronous file read - returns immediately
*/
public static Future<String> readFileAsync(Path path) throws IOException {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());
// Returns immediately with Future
Future<Integer> result = channel.read(buffer, 0);
return new Future<>() {
@Override
public String get() throws InterruptedException, ExecutionException {
result.get(); // Wait for completion
buffer.flip();
return StandardCharsets.UTF_8.decode(buffer).toString();
}
@Override
public String get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
result.get(timeout, unit);
buffer.flip();
return StandardCharsets.UTF_8.decode(buffer).toString();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return result.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return result.isCancelled();
}
@Override
public boolean isDone() {
return result.isDone();
}
};
}
/**
* Demonstrate differences
*/
public static void main(String[] args) throws Exception {
Path path = Path.of("data.txt");
Files.writeString(path, "Hello, Async I/O!");
// Synchronous - blocks
long start = System.nanoTime();
String syncResult = readFileSync(path);
long syncTime = System.nanoTime() - start;
System.out.println("Sync result: " + syncResult);
System.out.println("Sync time: " + (syncTime / 1_000_000) + "ms");
// Asynchronous - returns immediately
start = System.nanoTime();
Future<String> asyncFuture = readFileAsync(path);
long submitTime = System.nanoTime() - start;
System.out.println("Async submit time: " + (submitTime / 1_000_000) + "ms");
// Do other work while I/O happens
System.out.println("Doing other work...");
// Get result when needed
String asyncResult = asyncFuture.get();
System.out.println("Async result: " + asyncResult);
}
}
Asynchronous Channel Types:
/**
* Overview of async channel types
*/
public class AsyncChannelTypes {
/**
* AsynchronousFileChannel - file I/O
*/
public static void fileChannelExample() throws Exception {
Path path = Path.of("example.txt");
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
System.out.println("File channel opened: " + channel.isOpen());
channel.close();
}
/**
* AsynchronousSocketChannel - network client
*/
public static void socketChannelExample() throws Exception {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
// Async connect
Future<Void> connectFuture = channel.connect(
new InetSocketAddress("example.com", 80));
connectFuture.get(); // Wait for connection
System.out.println("Connected: " + channel.isOpen());
channel.close();
}
/**
* AsynchronousServerSocketChannel - network server
*/
public static void serverChannelExample() throws Exception {
AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 8080));
System.out.println("Server listening on port 8080");
// Async accept
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
server.close();
}
}
Future-Based Pattern
Basic Future Operations:
/**
* Working with Future for async operations
*/
public class FutureBasedAsync {
/**
* Read file with Future
*/
public static void readWithFuture(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// Submit operation
Future<Integer> future = channel.read(buffer, 0);
// Check status
System.out.println("Is done? " + future.isDone());
// Wait for completion
Integer bytesRead = future.get();
System.out.println("Bytes read: " + bytesRead);
buffer.flip();
System.out.println("Content: " +
StandardCharsets.UTF_8.decode(buffer).toString());
channel.close();
}
/**
* Read with timeout
*/
public static void readWithTimeout(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0);
try {
// Wait max 5 seconds
Integer bytesRead = future.get(5, TimeUnit.SECONDS);
System.out.println("Read completed: " + bytesRead + " bytes");
} catch (TimeoutException e) {
System.err.println("Operation timed out");
future.cancel(true);
} finally {
channel.close();
}
}
/**
* Multiple concurrent reads
*/
public static void concurrentReads(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
long fileSize = Files.size(path);
int chunkSize = 1024;
List<Future<Integer>> futures = new ArrayList<>();
List<ByteBuffer> buffers = new ArrayList<>();
// Start multiple reads
for (long position = 0; position < fileSize; position += chunkSize) {
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
buffers.add(buffer);
Future<Integer> future = channel.read(buffer, position);
futures.add(future);
}
// Wait for all completions
int totalRead = 0;
for (Future<Integer> future : futures) {
totalRead += future.get();
}
System.out.println("Total bytes read: " + totalRead);
channel.close();
}
/**
* Cancel operation
*/
public static void cancelOperation(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); // Large read
Future<Integer> future = channel.read(buffer, 0);
// Cancel after 100ms
Thread.sleep(100);
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
System.out.println("Cancelled: " + cancelled);
}
channel.close();
}
}
CompletionHandler Pattern
Basic Handler Implementation:
/**
* CompletionHandler for callback-based async I/O
*/
public class HandlerBasedAsync {
/**
* Read with completion handler
*/
public static void readWithHandler(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// Handler receives result
CompletionHandler<Integer, ByteBuffer> handler =
new CompletionHandler<>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
System.out.println("Read completed: " + bytesRead + " bytes");
attachment.flip();
String content = StandardCharsets.UTF_8
.decode(attachment).toString();
System.out.println("Content: " + content);
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("Read failed: " + exc.getMessage());
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
// Submit with handler
channel.read(buffer, 0, buffer, handler);
// Main thread continues immediately
System.out.println("Read submitted, continuing...");
// Keep program alive
Thread.sleep(1000);
}
/**
* Write with completion handler
*/
public static void writeWithHandler(Path path, String content) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
CompletionHandler<Integer, String> handler =
new CompletionHandler<>() {
@Override
public void completed(Integer bytesWritten, String message) {
System.out.println("Write completed: " + bytesWritten + " bytes");
System.out.println("Message: " + message);
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, String message) {
System.err.println("Write failed: " + exc.getMessage());
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
channel.write(buffer, 0, "Write operation", handler);
Thread.sleep(1000);
}
/**
* Chained operations with handlers
*/
public static void chainedOperations(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// First read
channel.read(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("First read: " + result + " bytes");
buffer.flip();
processData(buffer);
buffer.clear();
// Second read at different position
channel.read(buffer, 1024, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("Second read: " + result + " bytes");
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.err.println("Second read failed: " + exc.getMessage());
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.err.println("First read failed: " + exc.getMessage());
}
});
Thread.sleep(1000);
}
private static void processData(ByteBuffer buffer) {
System.out.println("Processing " + buffer.remaining() + " bytes");
}
}
Thread Pool Management
Custom Thread Pool:
/**
* Managing thread pools for async I/O
*/
public class ThreadPoolManagement {
/**
* Use default thread pool
*/
public static void defaultThreadPool(Path path) throws Exception {
// Uses default system thread pool
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
System.out.println("Using default thread pool");
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0).get();
channel.close();
}
/**
* Use custom thread pool
*/
public static void customThreadPool(Path path) throws Exception {
// Create dedicated thread pool
ExecutorService executor = Executors.newFixedThreadPool(4,
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("AsyncIO-" + counter.incrementAndGet());
thread.setDaemon(false);
return thread;
}
});
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
Set.of(StandardOpenOption.READ),
executor);
System.out.println("Using custom thread pool");
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0).get();
channel.close();
executor.shutdown();
}
/**
* Configure thread pool for workload
*/
public static ExecutorService createOptimizedPool(int ioThreads) {
return new ThreadPoolExecutor(
ioThreads, // Core threads
ioThreads * 2, // Max threads
60L, TimeUnit.SECONDS, // Keep alive
new LinkedBlockingQueue<>(100), // Work queue
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("OptimizedIO-" + counter.incrementAndGet());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY + 1);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // Backpressure
);
}
/**
* Monitor thread pool usage
*/
public static void monitorThreadPool() throws Exception {
ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(4);
Path path = Path.of("data.txt");
for (int i = 0; i < 10; i++) {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
Set.of(StandardOpenOption.READ),
executor);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0);
}
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
Error Handling
Robust Error Handling:
/**
* Comprehensive error handling patterns
*/
public class AsyncErrorHandling {
/**
* Handle errors with Future
*/
public static void futureErrorHandling(Path path) {
try {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0);
try {
Integer bytesRead = future.get(5, TimeUnit.SECONDS);
System.out.println("Success: " + bytesRead + " bytes");
} catch (ExecutionException e) {
// I/O error occurred
System.err.println("I/O error: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.err.println("Operation timed out");
future.cancel(true);
} catch (InterruptedException e) {
System.err.println("Operation interrupted");
Thread.currentThread().interrupt();
} finally {
channel.close();
}
} catch (IOException e) {
System.err.println("Failed to open channel: " + e.getMessage());
}
}
/**
* Handle errors with CompletionHandler
*/
public static void handlerErrorHandling(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
CompletionHandler<Integer, Void> handler = new CompletionHandler<>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("Success: " + result + " bytes");
closeQuietly(channel);
}
@Override
public void failed(Throwable exc, Void attachment) {
if (exc instanceof AsynchronousCloseException) {
System.err.println("Channel was closed");
} else if (exc instanceof ClosedChannelException) {
System.err.println("Channel is closed");
} else if (exc instanceof IOException) {
System.err.println("I/O error: " + exc.getMessage());
} else {
System.err.println("Unexpected error: " + exc.getMessage());
}
closeQuietly(channel);
}
};
channel.read(buffer, 0, null, handler);
Thread.sleep(1000);
}
private static void closeQuietly(AsynchronousFileChannel channel) {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
// Ignore
}
}
/**
* Retry failed operations
*/
public static class RetryHandler<V, A> implements CompletionHandler<V, A> {
private final CompletionHandler<V, A> delegate;
private final int maxRetries;
private int retryCount = 0;
private final Runnable retryOperation;
public RetryHandler(CompletionHandler<V, A> delegate,
int maxRetries,
Runnable retryOperation) {
this.delegate = delegate;
this.maxRetries = maxRetries;
this.retryOperation = retryOperation;
}
@Override
public void completed(V result, A attachment) {
delegate.completed(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (retryCount < maxRetries && isRetryable(exc)) {
retryCount++;
System.out.println("Retry attempt " + retryCount);
try {
Thread.sleep(100 * retryCount); // Exponential backoff
retryOperation.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
delegate.failed(exc, attachment);
}
} else {
delegate.failed(exc, attachment);
}
}
private boolean isRetryable(Throwable exc) {
return exc instanceof IOException &&
!(exc instanceof ClosedChannelException);
}
}
}
Best Practices
1. Choose Appropriate Pattern:
// Future for simple operations
Future<Integer> future = channel.read(buffer, 0);
Integer result = future.get();
// CompletionHandler for high throughput
channel.read(buffer, 0, attachment, handler);
2. Manage Thread Pools:
// Use dedicated pool for I/O-heavy workloads
ExecutorService ioPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
path, Set.of(StandardOpenOption.READ), ioPool);
3. Handle Errors Properly:
try {
result = future.get(timeout, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// Handle specific errors
future.cancel(true);
}
4. Close Resources:
try (AsynchronousFileChannel channel = ...) {
// Automatic resource management
} // Closed automatically
5. Avoid Blocking in Handlers:
@Override
public void completed(Integer result, Object attachment) {
// Process quickly or delegate to another thread
executor.submit(() -> processResult(result));
}
These fundamentals provide the foundation for building scalable asynchronous I/O applications in Java.