16.2 Asynchronous File Operations

Master efficient file I/O with AsynchronousFileChannel for high-performance applications.

Reading Files Asynchronously

Basic File Reading:

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.*;
import java.util.concurrent.*;

/**
 * Asynchronous file reading patterns
 */
public class AsyncFileReading {

    /**
     * Simple async read
     */
    public static void simpleRead(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate((int) Files.size(path));

        Future<Integer> future = channel.read(buffer, 0);

        // Do other work while reading
        System.out.println("Reading file asynchronously...");

        // Wait for completion
        Integer bytesRead = future.get();

        buffer.flip();
        String content = StandardCharsets.UTF_8.decode(buffer).toString();

        System.out.println("Read " + bytesRead + " bytes: " + content);

        channel.close();
    }

    /**
     * Read large file in chunks
     */
    public static void chunkedRead(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        long fileSize = Files.size(path);
        int chunkSize = 8192;

        StringBuilder content = new StringBuilder();

        for (long position = 0; position < fileSize; position += chunkSize) {
            ByteBuffer buffer = ByteBuffer.allocate(chunkSize);

            Future<Integer> future = channel.read(buffer, position);
            Integer bytesRead = future.get();

            if (bytesRead > 0) {
                buffer.flip();
                content.append(StandardCharsets.UTF_8.decode(buffer));
            }
        }

        System.out.println("Total content length: " + content.length());

        channel.close();
    }

    /**
     * Read with completion handler
     */
    public static void readWithHandler(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        CountDownLatch latch = new CountDownLatch(1);

        channel.read(buffer, 0, null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer bytesRead, Void attachment) {
                System.out.println("Read completed: " + bytesRead + " bytes");

                buffer.flip();
                String content = StandardCharsets.UTF_8.decode(buffer).toString();
                System.out.println("Content: " + content);

                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                latch.countDown();
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Read failed: " + exc.getMessage());
                latch.countDown();
            }
        });

        latch.await();
    }

    /**
     * Parallel read from multiple positions
     */
    public static void parallelRead(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        long fileSize = Files.size(path);
        int chunkSize = 1024;
        int numChunks = (int) Math.ceil((double) fileSize / chunkSize);

        List<Future<Integer>> futures = new ArrayList<>();
        ByteBuffer[] buffers = new ByteBuffer[numChunks];

        // Start all reads in parallel
        for (int i = 0; i < numChunks; i++) {
            long position = i * chunkSize;
            int size = (int) Math.min(chunkSize, fileSize - position);

            ByteBuffer buffer = ByteBuffer.allocate(size);
            buffers[i] = buffer;

            Future<Integer> future = channel.read(buffer, position);
            futures.add(future);
        }

        // Wait for all completions
        StringBuilder content = new StringBuilder();
        for (int i = 0; i < numChunks; i++) {
            futures.get(i).get();
            buffers[i].flip();
            content.append(StandardCharsets.UTF_8.decode(buffers[i]));
        }

        System.out.println("Read " + content.length() + " characters in parallel");

        channel.close();
    }
}

Writing Files Asynchronously

Basic File Writing:

/**
 * Asynchronous file writing patterns
 */
public class AsyncFileWriting {

    /**
     * Simple async write
     */
    public static void simpleWrite(Path path, String content) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));

        Future<Integer> future = channel.write(buffer, 0);

        Integer bytesWritten = future.get();
        System.out.println("Wrote " + bytesWritten + " bytes");

        channel.close();
    }

    /**
     * Append to file
     */
    public static void appendWrite(Path path, String content) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        long fileSize = Files.size(path);

        ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));

        Future<Integer> future = channel.write(buffer, fileSize);

        Integer bytesWritten = future.get();
        System.out.println("Appended " + bytesWritten + " bytes");

        channel.close();
    }

    /**
     * Write large data in chunks
     */
    public static void chunkedWrite(Path path, byte[] data) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        int chunkSize = 8192;
        long position = 0;

        for (int offset = 0; offset < data.length; offset += chunkSize) {
            int length = Math.min(chunkSize, data.length - offset);
            ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);

            Future<Integer> future = channel.write(buffer, position);
            Integer bytesWritten = future.get();

            position += bytesWritten;
        }

        System.out.println("Wrote total " + position + " bytes");

        channel.close();
    }

    /**
     * Write with completion handler
     */
    public static void writeWithHandler(Path path, String content) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));

        CountDownLatch latch = new CountDownLatch(1);

        channel.write(buffer, 0, null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer bytesWritten, Void attachment) {
                System.out.println("Write completed: " + bytesWritten + " bytes");

                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                latch.countDown();
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Write failed: " + exc.getMessage());
                latch.countDown();
            }
        });

        latch.await();
    }

    /**
     * Parallel writes to different positions
     */
    public static void parallelWrite(Path path, String[] contents) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        List<Future<Integer>> futures = new ArrayList<>();
        long position = 0;

        for (String content : contents) {
            ByteBuffer buffer = ByteBuffer.wrap(
                content.getBytes(StandardCharsets.UTF_8));

            Future<Integer> future = channel.write(buffer, position);
            futures.add(future);

            position += content.getBytes(StandardCharsets.UTF_8).length;
        }

        // Wait for all completions
        int totalWritten = 0;
        for (Future<Integer> future : futures) {
            totalWritten += future.get();
        }

        System.out.println("Total written: " + totalWritten + " bytes");

        channel.close();
    }
}

Positioned I/O Operations

Random Access Pattern:

/**
 * Random access file operations
 */
public class AsyncPositionedIO {

    /**
     * Read from specific position
     */
    public static String readAt(Path path, long position, int length) 
            throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        ByteBuffer buffer = ByteBuffer.allocate(length);

        Future<Integer> future = channel.read(buffer, position);
        Integer bytesRead = future.get();

        buffer.flip();
        String content = StandardCharsets.UTF_8.decode(buffer).toString();

        channel.close();

        return content;
    }

    /**
     * Write at specific position
     */
    public static void writeAt(Path path, long position, String content) 
            throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));

        Future<Integer> future = channel.write(buffer, position);
        Integer bytesWritten = future.get();

        System.out.println("Wrote " + bytesWritten + " bytes at position " + position);

        channel.close();
    }

    /**
     * Update specific record in file
     */
    public static void updateRecord(Path path, int recordNumber, 
                                   String newData, int recordSize) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ,
            StandardOpenOption.WRITE);

        long position = recordNumber * recordSize;

        // Read current record
        ByteBuffer readBuffer = ByteBuffer.allocate(recordSize);
        Future<Integer> readFuture = channel.read(readBuffer, position);
        readFuture.get();

        readBuffer.flip();
        String oldData = StandardCharsets.UTF_8.decode(readBuffer).toString();
        System.out.println("Old data: " + oldData);

        // Write new record
        ByteBuffer writeBuffer = ByteBuffer.allocate(recordSize);
        byte[] dataBytes = newData.getBytes(StandardCharsets.UTF_8);
        writeBuffer.put(dataBytes);

        // Pad if necessary
        while (writeBuffer.hasRemaining()) {
            writeBuffer.put((byte) 0);
        }

        writeBuffer.flip();

        Future<Integer> writeFuture = channel.write(writeBuffer, position);
        Integer bytesWritten = writeFuture.get();

        System.out.println("Updated record " + recordNumber + 
            " with " + bytesWritten + " bytes");

        channel.close();
    }

    /**
     * Scatter/gather with positions
     */
    public static void scatterGatherPositioned(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        // Read from multiple positions
        ByteBuffer buffer1 = ByteBuffer.allocate(100);
        ByteBuffer buffer2 = ByteBuffer.allocate(100);
        ByteBuffer buffer3 = ByteBuffer.allocate(100);

        Future<Integer> f1 = channel.read(buffer1, 0);
        Future<Integer> f2 = channel.read(buffer2, 100);
        Future<Integer> f3 = channel.read(buffer3, 200);

        // Wait for all
        f1.get();
        f2.get();
        f3.get();

        System.out.println("Read from 3 positions concurrently");

        channel.close();
    }
}

Concurrent File Access

Multiple Readers:

/**
 * Concurrent file access patterns
 */
public class ConcurrentFileAccess {

    /**
     * Multiple readers reading same file
     */
    public static void multipleReaders(Path path, int numReaders) 
            throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(numReaders);
        CountDownLatch latch = new CountDownLatch(numReaders);

        long fileSize = Files.size(path);
        long chunkSize = fileSize / numReaders;

        for (int i = 0; i < numReaders; i++) {
            final int readerId = i;
            final long startPos = i * chunkSize;
            final long length = (i == numReaders - 1) ? 
                (fileSize - startPos) : chunkSize;

            executor.submit(() -> {
                try {
                    AsynchronousFileChannel channel = 
                        AsynchronousFileChannel.open(path,
                            StandardOpenOption.READ);

                    ByteBuffer buffer = ByteBuffer.allocate((int) length);

                    Future<Integer> future = channel.read(buffer, startPos);
                    Integer bytesRead = future.get();

                    System.out.println("Reader " + readerId + 
                        " read " + bytesRead + " bytes");

                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executor.shutdown();
    }

    /**
     * Single writer with multiple readers
     */
    public static void writerWithReaders(Path path) throws Exception {
        // Create shared channel with custom thread pool
        ExecutorService ioPool = Executors.newFixedThreadPool(4);

        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            Set.of(StandardOpenOption.READ, StandardOpenOption.WRITE, 
                   StandardOpenOption.CREATE),
            ioPool);

        CountDownLatch latch = new CountDownLatch(4); // 1 writer + 3 readers

        // Writer
        ioPool.submit(() -> {
            try {
                ByteBuffer buffer = ByteBuffer.wrap(
                    "Data from writer".getBytes(StandardCharsets.UTF_8));
                Future<Integer> future = channel.write(buffer, 0);
                future.get();
                System.out.println("Writer completed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                latch.countDown();
            }
        });

        // Wait a bit for writer
        Thread.sleep(100);

        // Multiple readers
        for (int i = 0; i < 3; i++) {
            final int readerId = i;
            ioPool.submit(() -> {
                try {
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    Future<Integer> future = channel.read(buffer, 0);
                    Integer bytesRead = future.get();

                    buffer.flip();
                    String content = StandardCharsets.UTF_8.decode(buffer).toString();

                    System.out.println("Reader " + readerId + 
                        " read: " + content);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        channel.close();
        ioPool.shutdown();
    }
}

File Locking

Advisory Locking:

/**
 * File locking with async channels
 */
public class AsyncFileLocking {

    /**
     * Exclusive lock
     */
    public static void exclusiveLock(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

        // Try to acquire exclusive lock
        Future<FileLock> lockFuture = channel.lock();
        FileLock lock = lockFuture.get();

        System.out.println("Acquired exclusive lock");
        System.out.println("Lock valid: " + lock.isValid());
        System.out.println("Lock shared: " + lock.isShared());

        // Perform operations
        ByteBuffer buffer = ByteBuffer.wrap("Locked data".getBytes());
        channel.write(buffer, 0).get();

        // Release lock
        lock.release();
        System.out.println("Released lock");

        channel.close();
    }

    /**
     * Shared lock
     */
    public static void sharedLock(Path path) throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ);

        // Acquire shared lock (false = shared)
        Future<FileLock> lockFuture = channel.lock(0, Long.MAX_VALUE, true);
        FileLock lock = lockFuture.get();

        System.out.println("Acquired shared lock");

        // Read data
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, 0).get();

        lock.release();
        channel.close();
    }

    /**
     * Lock specific region
     */
    public static void regionLock(Path path, long position, long size) 
            throws Exception {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
            StandardOpenOption.READ,
            StandardOpenOption.WRITE);

        // Lock specific region
        Future<FileLock> lockFuture = channel.lock(position, size, false);
        FileLock lock = lockFuture.get();

        System.out.println("Locked region: " + position + " to " + 
            (position + size));

        // Write to locked region
        ByteBuffer buffer = ByteBuffer.wrap("Region data".getBytes());
        channel.write(buffer, position).get();

        lock.release();
        channel.close();
    }

    /**
     * Try lock with timeout
     */
    public static boolean tryLockWithTimeout(Path path, long timeoutMs) {
        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
                StandardOpenOption.READ,
                StandardOpenOption.WRITE);

            Future<FileLock> lockFuture = channel.lock();

            try {
                FileLock lock = lockFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
                System.out.println("Lock acquired");

                // Use lock

                lock.release();
                channel.close();
                return true;
            } catch (TimeoutException e) {
                System.out.println("Could not acquire lock within timeout");
                lockFuture.cancel(true);
                channel.close();
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}

Real-World Example: Async Log Writer

High-Performance Log Writer:

/**
 * Asynchronous log writer with buffering
 */
public class AsyncLogWriter {
    private final AsynchronousFileChannel channel;
    private final AtomicLong position;
    private final ExecutorService executor;
    private final Queue<LogEntry> pendingWrites;

    public AsyncLogWriter(Path logFile) throws IOException {
        this.channel = AsynchronousFileChannel.open(logFile,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE,
            StandardOpenOption.APPEND);

        this.position = new AtomicLong(Files.size(logFile));
        this.executor = Executors.newSingleThreadExecutor();
        this.pendingWrites = new ConcurrentLinkedQueue<>();
    }

    /**
     * Write log entry asynchronously
     */
    public CompletableFuture<Void> writeLog(String message) {
        LogEntry entry = new LogEntry(message);
        pendingWrites.offer(entry);

        return CompletableFuture.runAsync(() -> {
            processWrites();
        }, executor);
    }

    private void processWrites() {
        LogEntry entry;
        while ((entry = pendingWrites.poll()) != null) {
            writeEntry(entry);
        }
    }

    private void writeEntry(LogEntry entry) {
        String line = entry.timestamp + " - " + entry.message + "\n";
        ByteBuffer buffer = ByteBuffer.wrap(
            line.getBytes(StandardCharsets.UTF_8));

        long writePos = position.getAndAdd(buffer.remaining());

        try {
            channel.write(buffer, writePos, null, 
                new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer bytesWritten, Void attachment) {
                        // Success
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        System.err.println("Log write failed: " + 
                            exc.getMessage());
                    }
                });
        } catch (Exception e) {
            System.err.println("Error writing log: " + e.getMessage());
        }
    }

    /**
     * Flush and close
     */
    public void close() throws IOException {
        // Wait for pending writes
        while (!pendingWrites.isEmpty()) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }

        executor.shutdown();
        channel.close();
    }

    private static class LogEntry {
        final String timestamp;
        final String message;

        LogEntry(String message) {
            this.timestamp = LocalDateTime.now().toString();
            this.message = message;
        }
    }

    /**
     * Usage example
     */
    public static void main(String[] args) throws Exception {
        Path logFile = Path.of("application.log");

        try (AsyncLogWriter logger = new AsyncLogWriter(logFile)) {
            // Write multiple log entries
            for (int i = 0; i < 100; i++) {
                logger.writeLog("Log entry " + i);
            }

            System.out.println("Log entries queued");
        }

        System.out.println("Logger closed");
    }
}

Best Practices

1. Use Positioned Operations:

// Efficient random access
channel.read(buffer, position);
channel.write(buffer, position);

2. Handle Partial Writes:

while (buffer.hasRemaining()) {
    int written = channel.write(buffer, position).get();
    position += written;
}

3. Use Appropriate Thread Pool:

ExecutorService ioPool = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
    path, Set.of(StandardOpenOption.READ), ioPool);

4. Lock When Necessary:

FileLock lock = channel.lock().get();
try {
    // Critical section
} finally {
    lock.release();
}

5. Close Properly:

try (AsynchronousFileChannel channel = ...) {
    // Operations
} // Auto-closed

These async file operations enable high-performance, non-blocking file I/O in Java applications.