16.2 Asynchronous File Operations
Master efficient file I/O with AsynchronousFileChannel for high-performance applications.
Reading Files Asynchronously
Basic File Reading:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.*;
import java.util.concurrent.*;
/**
* Asynchronous file reading patterns
*/
public class AsyncFileReading {
/**
* Simple async read
*/
public static void simpleRead(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate((int) Files.size(path));
Future<Integer> future = channel.read(buffer, 0);
// Do other work while reading
System.out.println("Reading file asynchronously...");
// Wait for completion
Integer bytesRead = future.get();
buffer.flip();
String content = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("Read " + bytesRead + " bytes: " + content);
channel.close();
}
/**
* Read large file in chunks
*/
public static void chunkedRead(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
long fileSize = Files.size(path);
int chunkSize = 8192;
StringBuilder content = new StringBuilder();
for (long position = 0; position < fileSize; position += chunkSize) {
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
Future<Integer> future = channel.read(buffer, position);
Integer bytesRead = future.get();
if (bytesRead > 0) {
buffer.flip();
content.append(StandardCharsets.UTF_8.decode(buffer));
}
}
System.out.println("Total content length: " + content.length());
channel.close();
}
/**
* Read with completion handler
*/
public static void readWithHandler(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
CountDownLatch latch = new CountDownLatch(1);
channel.read(buffer, 0, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesRead, Void attachment) {
System.out.println("Read completed: " + bytesRead + " bytes");
buffer.flip();
String content = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("Content: " + content);
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Read failed: " + exc.getMessage());
latch.countDown();
}
});
latch.await();
}
/**
* Parallel read from multiple positions
*/
public static void parallelRead(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
long fileSize = Files.size(path);
int chunkSize = 1024;
int numChunks = (int) Math.ceil((double) fileSize / chunkSize);
List<Future<Integer>> futures = new ArrayList<>();
ByteBuffer[] buffers = new ByteBuffer[numChunks];
// Start all reads in parallel
for (int i = 0; i < numChunks; i++) {
long position = i * chunkSize;
int size = (int) Math.min(chunkSize, fileSize - position);
ByteBuffer buffer = ByteBuffer.allocate(size);
buffers[i] = buffer;
Future<Integer> future = channel.read(buffer, position);
futures.add(future);
}
// Wait for all completions
StringBuilder content = new StringBuilder();
for (int i = 0; i < numChunks; i++) {
futures.get(i).get();
buffers[i].flip();
content.append(StandardCharsets.UTF_8.decode(buffers[i]));
}
System.out.println("Read " + content.length() + " characters in parallel");
channel.close();
}
}
Writing Files Asynchronously
Basic File Writing:
/**
* Asynchronous file writing patterns
*/
public class AsyncFileWriting {
/**
* Simple async write
*/
public static void simpleWrite(Path path, String content) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
Future<Integer> future = channel.write(buffer, 0);
Integer bytesWritten = future.get();
System.out.println("Wrote " + bytesWritten + " bytes");
channel.close();
}
/**
* Append to file
*/
public static void appendWrite(Path path, String content) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
long fileSize = Files.size(path);
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
Future<Integer> future = channel.write(buffer, fileSize);
Integer bytesWritten = future.get();
System.out.println("Appended " + bytesWritten + " bytes");
channel.close();
}
/**
* Write large data in chunks
*/
public static void chunkedWrite(Path path, byte[] data) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
int chunkSize = 8192;
long position = 0;
for (int offset = 0; offset < data.length; offset += chunkSize) {
int length = Math.min(chunkSize, data.length - offset);
ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
Future<Integer> future = channel.write(buffer, position);
Integer bytesWritten = future.get();
position += bytesWritten;
}
System.out.println("Wrote total " + position + " bytes");
channel.close();
}
/**
* Write with completion handler
*/
public static void writeWithHandler(Path path, String content) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
CountDownLatch latch = new CountDownLatch(1);
channel.write(buffer, 0, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesWritten, Void attachment) {
System.out.println("Write completed: " + bytesWritten + " bytes");
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Write failed: " + exc.getMessage());
latch.countDown();
}
});
latch.await();
}
/**
* Parallel writes to different positions
*/
public static void parallelWrite(Path path, String[] contents) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
List<Future<Integer>> futures = new ArrayList<>();
long position = 0;
for (String content : contents) {
ByteBuffer buffer = ByteBuffer.wrap(
content.getBytes(StandardCharsets.UTF_8));
Future<Integer> future = channel.write(buffer, position);
futures.add(future);
position += content.getBytes(StandardCharsets.UTF_8).length;
}
// Wait for all completions
int totalWritten = 0;
for (Future<Integer> future : futures) {
totalWritten += future.get();
}
System.out.println("Total written: " + totalWritten + " bytes");
channel.close();
}
}
Positioned I/O Operations
Random Access Pattern:
/**
* Random access file operations
*/
public class AsyncPositionedIO {
/**
* Read from specific position
*/
public static String readAt(Path path, long position, int length)
throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(length);
Future<Integer> future = channel.read(buffer, position);
Integer bytesRead = future.get();
buffer.flip();
String content = StandardCharsets.UTF_8.decode(buffer).toString();
channel.close();
return content;
}
/**
* Write at specific position
*/
public static void writeAt(Path path, long position, String content)
throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
Future<Integer> future = channel.write(buffer, position);
Integer bytesWritten = future.get();
System.out.println("Wrote " + bytesWritten + " bytes at position " + position);
channel.close();
}
/**
* Update specific record in file
*/
public static void updateRecord(Path path, int recordNumber,
String newData, int recordSize) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
long position = recordNumber * recordSize;
// Read current record
ByteBuffer readBuffer = ByteBuffer.allocate(recordSize);
Future<Integer> readFuture = channel.read(readBuffer, position);
readFuture.get();
readBuffer.flip();
String oldData = StandardCharsets.UTF_8.decode(readBuffer).toString();
System.out.println("Old data: " + oldData);
// Write new record
ByteBuffer writeBuffer = ByteBuffer.allocate(recordSize);
byte[] dataBytes = newData.getBytes(StandardCharsets.UTF_8);
writeBuffer.put(dataBytes);
// Pad if necessary
while (writeBuffer.hasRemaining()) {
writeBuffer.put((byte) 0);
}
writeBuffer.flip();
Future<Integer> writeFuture = channel.write(writeBuffer, position);
Integer bytesWritten = writeFuture.get();
System.out.println("Updated record " + recordNumber +
" with " + bytesWritten + " bytes");
channel.close();
}
/**
* Scatter/gather with positions
*/
public static void scatterGatherPositioned(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
// Read from multiple positions
ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer2 = ByteBuffer.allocate(100);
ByteBuffer buffer3 = ByteBuffer.allocate(100);
Future<Integer> f1 = channel.read(buffer1, 0);
Future<Integer> f2 = channel.read(buffer2, 100);
Future<Integer> f3 = channel.read(buffer3, 200);
// Wait for all
f1.get();
f2.get();
f3.get();
System.out.println("Read from 3 positions concurrently");
channel.close();
}
}
Concurrent File Access
Multiple Readers:
/**
* Concurrent file access patterns
*/
public class ConcurrentFileAccess {
/**
* Multiple readers reading same file
*/
public static void multipleReaders(Path path, int numReaders)
throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(numReaders);
CountDownLatch latch = new CountDownLatch(numReaders);
long fileSize = Files.size(path);
long chunkSize = fileSize / numReaders;
for (int i = 0; i < numReaders; i++) {
final int readerId = i;
final long startPos = i * chunkSize;
final long length = (i == numReaders - 1) ?
(fileSize - startPos) : chunkSize;
executor.submit(() -> {
try {
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate((int) length);
Future<Integer> future = channel.read(buffer, startPos);
Integer bytesRead = future.get();
System.out.println("Reader " + readerId +
" read " + bytesRead + " bytes");
channel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
}
/**
* Single writer with multiple readers
*/
public static void writerWithReaders(Path path) throws Exception {
// Create shared channel with custom thread pool
ExecutorService ioPool = Executors.newFixedThreadPool(4);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
Set.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE),
ioPool);
CountDownLatch latch = new CountDownLatch(4); // 1 writer + 3 readers
// Writer
ioPool.submit(() -> {
try {
ByteBuffer buffer = ByteBuffer.wrap(
"Data from writer".getBytes(StandardCharsets.UTF_8));
Future<Integer> future = channel.write(buffer, 0);
future.get();
System.out.println("Writer completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
// Wait a bit for writer
Thread.sleep(100);
// Multiple readers
for (int i = 0; i < 3; i++) {
final int readerId = i;
ioPool.submit(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0);
Integer bytesRead = future.get();
buffer.flip();
String content = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("Reader " + readerId +
" read: " + content);
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
channel.close();
ioPool.shutdown();
}
}
File Locking
Advisory Locking:
/**
* File locking with async channels
*/
public class AsyncFileLocking {
/**
* Exclusive lock
*/
public static void exclusiveLock(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
// Try to acquire exclusive lock
Future<FileLock> lockFuture = channel.lock();
FileLock lock = lockFuture.get();
System.out.println("Acquired exclusive lock");
System.out.println("Lock valid: " + lock.isValid());
System.out.println("Lock shared: " + lock.isShared());
// Perform operations
ByteBuffer buffer = ByteBuffer.wrap("Locked data".getBytes());
channel.write(buffer, 0).get();
// Release lock
lock.release();
System.out.println("Released lock");
channel.close();
}
/**
* Shared lock
*/
public static void sharedLock(Path path) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
// Acquire shared lock (false = shared)
Future<FileLock> lockFuture = channel.lock(0, Long.MAX_VALUE, true);
FileLock lock = lockFuture.get();
System.out.println("Acquired shared lock");
// Read data
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0).get();
lock.release();
channel.close();
}
/**
* Lock specific region
*/
public static void regionLock(Path path, long position, long size)
throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
// Lock specific region
Future<FileLock> lockFuture = channel.lock(position, size, false);
FileLock lock = lockFuture.get();
System.out.println("Locked region: " + position + " to " +
(position + size));
// Write to locked region
ByteBuffer buffer = ByteBuffer.wrap("Region data".getBytes());
channel.write(buffer, position).get();
lock.release();
channel.close();
}
/**
* Try lock with timeout
*/
public static boolean tryLockWithTimeout(Path path, long timeoutMs) {
try {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
Future<FileLock> lockFuture = channel.lock();
try {
FileLock lock = lockFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
System.out.println("Lock acquired");
// Use lock
lock.release();
channel.close();
return true;
} catch (TimeoutException e) {
System.out.println("Could not acquire lock within timeout");
lockFuture.cancel(true);
channel.close();
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
Real-World Example: Async Log Writer
High-Performance Log Writer:
/**
* Asynchronous log writer with buffering
*/
public class AsyncLogWriter {
private final AsynchronousFileChannel channel;
private final AtomicLong position;
private final ExecutorService executor;
private final Queue<LogEntry> pendingWrites;
public AsyncLogWriter(Path logFile) throws IOException {
this.channel = AsynchronousFileChannel.open(logFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
this.position = new AtomicLong(Files.size(logFile));
this.executor = Executors.newSingleThreadExecutor();
this.pendingWrites = new ConcurrentLinkedQueue<>();
}
/**
* Write log entry asynchronously
*/
public CompletableFuture<Void> writeLog(String message) {
LogEntry entry = new LogEntry(message);
pendingWrites.offer(entry);
return CompletableFuture.runAsync(() -> {
processWrites();
}, executor);
}
private void processWrites() {
LogEntry entry;
while ((entry = pendingWrites.poll()) != null) {
writeEntry(entry);
}
}
private void writeEntry(LogEntry entry) {
String line = entry.timestamp + " - " + entry.message + "\n";
ByteBuffer buffer = ByteBuffer.wrap(
line.getBytes(StandardCharsets.UTF_8));
long writePos = position.getAndAdd(buffer.remaining());
try {
channel.write(buffer, writePos, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesWritten, Void attachment) {
// Success
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Log write failed: " +
exc.getMessage());
}
});
} catch (Exception e) {
System.err.println("Error writing log: " + e.getMessage());
}
}
/**
* Flush and close
*/
public void close() throws IOException {
// Wait for pending writes
while (!pendingWrites.isEmpty()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
executor.shutdown();
channel.close();
}
private static class LogEntry {
final String timestamp;
final String message;
LogEntry(String message) {
this.timestamp = LocalDateTime.now().toString();
this.message = message;
}
}
/**
* Usage example
*/
public static void main(String[] args) throws Exception {
Path logFile = Path.of("application.log");
try (AsyncLogWriter logger = new AsyncLogWriter(logFile)) {
// Write multiple log entries
for (int i = 0; i < 100; i++) {
logger.writeLog("Log entry " + i);
}
System.out.println("Log entries queued");
}
System.out.println("Logger closed");
}
}
Best Practices
1. Use Positioned Operations:
// Efficient random access
channel.read(buffer, position);
channel.write(buffer, position);
2. Handle Partial Writes:
while (buffer.hasRemaining()) {
int written = channel.write(buffer, position).get();
position += written;
}
3. Use Appropriate Thread Pool:
ExecutorService ioPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
path, Set.of(StandardOpenOption.READ), ioPool);
4. Lock When Necessary:
FileLock lock = channel.lock().get();
try {
// Critical section
} finally {
lock.release();
}
5. Close Properly:
try (AsynchronousFileChannel channel = ...) {
// Operations
} // Auto-closed
These async file operations enable high-performance, non-blocking file I/O in Java applications.