8.4 Real-World Applications and Patterns

This section explores practical applications of virtual threads with complete, production-ready examples.

Pattern 1: High-Throughput Web Server

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;

public class VirtualThreadWebServer {
    private final int port;
    private final RequestHandler requestHandler;
    private volatile boolean running = true;

    public VirtualThreadWebServer(int port, RequestHandler requestHandler) {
        this.port = port;
        this.requestHandler = requestHandler;
    }

    public void start() throws IOException {
        try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
            serverSocket.bind(new InetSocketAddress(port));
            serverSocket.configureBlocking(true);

            System.out.println("Server started on port " + port);

            while (running) {
                // Accept connection (blocking is fine)
                SocketChannel client = serverSocket.accept();

                // Spawn virtual thread per request
                Thread.startVirtualThread(() -> handleClient(client));
            }
        }
    }

    private void handleClient(SocketChannel client) {
        try {
            // Read request
            HttpRequest request = readRequest(client);

            // Process request (may involve database, external APIs, etc.)
            HttpResponse response = requestHandler.handle(request);

            // Send response
            writeResponse(client, response);

        } catch (Exception e) {
            System.err.println("Error handling client: " + e.getMessage());
            sendError(client, 500, "Internal Server Error");
        } finally {
            try {
                client.close();
            } catch (IOException e) {
                // Ignore close errors
            }
        }
    }

    private HttpRequest readRequest(SocketChannel client) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(8192);
        StringBuilder requestBuilder = new StringBuilder();

        while (client.read(buffer) > 0) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                requestBuilder.append((char) buffer.get());
            }
            buffer.clear();

            // Check if we've read complete headers
            String request = requestBuilder.toString();
            if (request.contains("\r\n\r\n")) {
                break;
            }
        }

        return HttpRequest.parse(requestBuilder.toString());
    }

    private void writeResponse(SocketChannel client, HttpResponse response) throws IOException {
        String responseStr = String.format(
            "HTTP/1.1 %d %s\r\n" +
            "Content-Type: %s\r\n" +
            "Content-Length: %d\r\n" +
            "\r\n" +
            "%s",
            response.statusCode(),
            response.statusText(),
            response.contentType(),
            response.body().length(),
            response.body()
        );

        client.write(ByteBuffer.wrap(responseStr.getBytes()));
    }

    private void sendError(SocketChannel client, int statusCode, String message) {
        try {
            writeResponse(client, HttpResponse.error(statusCode, message));
        } catch (IOException e) {
            // Ignore
        }
    }

    public void stop() {
        running = false;
    }

    // Supporting classes
    record HttpRequest(String method, String path, String headers, String body) {
        static HttpRequest parse(String requestStr) {
            String[] lines = requestStr.split("\r\n");
            String[] requestLine = lines[0].split(" ");

            return new HttpRequest(
                requestLine[0],
                requestLine[1],
                requestStr.substring(0, requestStr.indexOf("\r\n\r\n")),
                requestStr.substring(requestStr.indexOf("\r\n\r\n") + 4)
            );
        }
    }

    record HttpResponse(int statusCode, String statusText, String contentType, String body) {
        static HttpResponse ok(String body) {
            return new HttpResponse(200, "OK", "text/plain", body);
        }

        static HttpResponse json(String json) {
            return new HttpResponse(200, "OK", "application/json", json);
        }

        static HttpResponse error(int statusCode, String message) {
            return new HttpResponse(statusCode, getStatusText(statusCode), "text/plain", message);
        }

        private static String getStatusText(int statusCode) {
            return switch (statusCode) {
                case 200 -> "OK";
                case 404 -> "Not Found";
                case 500 -> "Internal Server Error";
                default -> "Error";
            };
        }
    }

    interface RequestHandler {
        HttpResponse handle(HttpRequest request);
    }

    // Example usage
    public static void main(String[] args) throws IOException {
        RequestHandler handler = request -> {
            // Simulate database query or external API call
            try {
                Thread.sleep(100);  // Virtual thread parks efficiently
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            return HttpResponse.ok("Hello from virtual thread: " + 
                Thread.currentThread().getName());
        };

        VirtualThreadWebServer server = new VirtualThreadWebServer(8080, handler);
        server.start();
    }
}

Pattern 2: Database Connection Pool with Virtual Threads

import java.sql.*;
import java.util.*;
import java.util.concurrent.*;

public class VirtualThreadDatabaseService {
    private final DataSource dataSource;

    public VirtualThreadDatabaseService(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    // Process multiple queries concurrently
    public Map<String, User> fetchUsers(List<String> userIds) throws Exception {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Submit all queries concurrently
            Map<String, Future<User>> futures = new ConcurrentHashMap<>();

            for (String userId : userIds) {
                futures.put(userId, executor.submit(() -> fetchUser(userId)));
            }

            // Collect results
            Map<String, User> users = new ConcurrentHashMap<>();
            for (Map.Entry<String, Future<User>> entry : futures.entrySet()) {
                User user = entry.getValue().get();
                if (user != null) {
                    users.put(entry.getKey(), user);
                }
            }

            return users;
        }
    }

    private User fetchUser(String userId) {
        // Each virtual thread gets its own connection
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT id, name, email FROM users WHERE id = ?"
             )) {

            stmt.setString(1, userId);
            ResultSet rs = stmt.executeQuery();

            if (rs.next()) {
                return new User(
                    rs.getString("id"),
                    rs.getString("name"),
                    rs.getString("email")
                );
            }
            return null;

        } catch (SQLException e) {
            throw new RuntimeException("Failed to fetch user: " + userId, e);
        }
    }

    // Batch insert with virtual threads
    public void insertUsers(List<User> users) throws Exception {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Insert in batches for better performance
            int batchSize = 100;
            List<Future<Void>> futures = new ArrayList<>();

            for (int i = 0; i < users.size(); i += batchSize) {
                int end = Math.min(i + batchSize, users.size());
                List<User> batch = users.subList(i, end);

                futures.add(executor.submit(() -> {
                    insertBatch(batch);
                    return null;
                }));
            }

            // Wait for all batches
            for (Future<Void> future : futures) {
                future.get();
            }
        }
    }

    private void insertBatch(List<User> users) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);

            try (PreparedStatement stmt = conn.prepareStatement(
                    "INSERT INTO users (id, name, email) VALUES (?, ?, ?)")) {

                for (User user : users) {
                    stmt.setString(1, user.id());
                    stmt.setString(2, user.name());
                    stmt.setString(3, user.email());
                    stmt.addBatch();
                }

                stmt.executeBatch();
                conn.commit();

            } catch (SQLException e) {
                conn.rollback();
                throw e;
            }

        } catch (SQLException e) {
            throw new RuntimeException("Failed to insert batch", e);
        }
    }

    // Aggregate data from multiple tables
    public UserProfile fetchUserProfile(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Fetch from multiple tables concurrently
            var userTask = scope.fork(() -> fetchUser(userId));
            var ordersTask = scope.fork(() -> fetchUserOrders(userId));
            var preferencesTask = scope.fork(() -> fetchUserPreferences(userId));

            scope.join();
            scope.throwIfFailed();

            return new UserProfile(
                userTask.get(),
                ordersTask.get(),
                preferencesTask.get()
            );
        }
    }

    private List<Order> fetchUserOrders(String userId) {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT id, total, status FROM orders WHERE user_id = ?"
             )) {

            stmt.setString(1, userId);
            ResultSet rs = stmt.executeQuery();

            List<Order> orders = new ArrayList<>();
            while (rs.next()) {
                orders.add(new Order(
                    rs.getString("id"),
                    rs.getBigDecimal("total"),
                    rs.getString("status")
                ));
            }
            return orders;

        } catch (SQLException e) {
            throw new RuntimeException("Failed to fetch orders", e);
        }
    }

    private Map<String, String> fetchUserPreferences(String userId) {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT key, value FROM user_preferences WHERE user_id = ?"
             )) {

            stmt.setString(1, userId);
            ResultSet rs = stmt.executeQuery();

            Map<String, String> prefs = new HashMap<>();
            while (rs.next()) {
                prefs.put(rs.getString("key"), rs.getString("value"));
            }
            return prefs;

        } catch (SQLException e) {
            throw new RuntimeException("Failed to fetch preferences", e);
        }
    }

    record User(String id, String name, String email) {}
    record Order(String id, java.math.BigDecimal total, String status) {}
    record UserProfile(User user, List<Order> orders, Map<String, String> preferences) {}

    interface DataSource {
        Connection getConnection() throws SQLException;
    }
}

Pattern 3: Microservices Fan-Out

import java.net.URI;
import java.net.http.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

public class MicroserviceAggregator {
    private final HttpClient httpClient;
    private final Map<String, URI> serviceUrls;

    public MicroserviceAggregator(Map<String, URI> serviceUrls) {
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(5))
            .build();
        this.serviceUrls = serviceUrls;
    }

    // Aggregate data from multiple microservices
    public DashboardData fetchDashboard(String userId) throws Exception {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Fan out to all services concurrently
            Future<UserInfo> userInfoFuture = executor.submit(() -> 
                fetchUserInfo(userId));

            Future<List<Order>> ordersFuture = executor.submit(() -> 
                fetchOrders(userId));

            Future<List<Notification>> notificationsFuture = executor.submit(() -> 
                fetchNotifications(userId));

            Future<AccountStats> statsFuture = executor.submit(() -> 
                fetchAccountStats(userId));

            Future<List<Recommendation>> recommendationsFuture = executor.submit(() -> 
                fetchRecommendations(userId));

            // Wait for all with timeout
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                CompletableFuture.supplyAsync(() -> getFuture(userInfoFuture)),
                CompletableFuture.supplyAsync(() -> getFuture(ordersFuture)),
                CompletableFuture.supplyAsync(() -> getFuture(notificationsFuture)),
                CompletableFuture.supplyAsync(() -> getFuture(statsFuture)),
                CompletableFuture.supplyAsync(() -> getFuture(recommendationsFuture))
            );

            try {
                allFutures.get(10, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                // Some services might be slow, continue with what we have
                System.err.println("Some services timed out");
            }

            return new DashboardData(
                getFutureOrDefault(userInfoFuture, null),
                getFutureOrDefault(ordersFuture, List.of()),
                getFutureOrDefault(notificationsFuture, List.of()),
                getFutureOrDefault(statsFuture, null),
                getFutureOrDefault(recommendationsFuture, List.of())
            );
        }
    }

    private UserInfo fetchUserInfo(String userId) {
        return callService(
            serviceUrls.get("user-service"),
            "/users/" + userId,
            UserInfo.class
        );
    }

    private List<Order> fetchOrders(String userId) {
        return callServiceList(
            serviceUrls.get("order-service"),
            "/orders?userId=" + userId,
            Order.class
        );
    }

    private List<Notification> fetchNotifications(String userId) {
        return callServiceList(
            serviceUrls.get("notification-service"),
            "/notifications?userId=" + userId,
            Notification.class
        );
    }

    private AccountStats fetchAccountStats(String userId) {
        return callService(
            serviceUrls.get("analytics-service"),
            "/stats/" + userId,
            AccountStats.class
        );
    }

    private List<Recommendation> fetchRecommendations(String userId) {
        return callServiceList(
            serviceUrls.get("recommendation-service"),
            "/recommendations?userId=" + userId,
            Recommendation.class
        );
    }

    private <T> T callService(URI baseUri, String path, Class<T> responseType) {
        try {
            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(baseUri + path))
                .timeout(Duration.ofSeconds(5))
                .GET()
                .build();

            HttpResponse<String> response = httpClient.send(
                request,
                HttpResponse.BodyHandlers.ofString()
            );

            if (response.statusCode() != 200) {
                throw new RuntimeException("Service returned: " + response.statusCode());
            }

            // Parse JSON response (simplified - use Jackson in production)
            return parseJson(response.body(), responseType);

        } catch (Exception e) {
            throw new RuntimeException("Failed to call service: " + baseUri + path, e);
        }
    }

    private <T> List<T> callServiceList(URI baseUri, String path, Class<T> itemType) {
        // Simplified - in production, properly parse JSON array
        T item = callService(baseUri, path, itemType);
        return List.of(item);
    }

    private <T> T parseJson(String json, Class<T> type) {
        // Simplified - use Jackson ObjectMapper in production
        return null;
    }

    private <T> T getFuture(Future<T> future) {
        try {
            return future.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private <T> T getFutureOrDefault(Future<T> future, T defaultValue) {
        try {
            if (future.isDone()) {
                return future.get();
            }
        } catch (Exception e) {
            // Ignore errors, return default
        }
        return defaultValue;
    }

    record DashboardData(
        UserInfo userInfo,
        List<Order> orders,
        List<Notification> notifications,
        AccountStats stats,
        List<Recommendation> recommendations
    ) {}

    record UserInfo(String id, String name, String email) {}
    record Order(String id, String status, double total) {}
    record Notification(String id, String message, boolean read) {}
    record AccountStats(int totalOrders, double totalSpent, int points) {}
    record Recommendation(String id, String title, String category) {}
}

Pattern 4: Parallel File Processing

import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ParallelFileProcessor {
    private final Path inputDirectory;
    private final Path outputDirectory;

    public ParallelFileProcessor(Path inputDirectory, Path outputDirectory) {
        this.inputDirectory = inputDirectory;
        this.outputDirectory = outputDirectory;
    }

    public ProcessingReport processAll() throws Exception {
        // Create output directory
        Files.createDirectories(outputDirectory);

        // Find all files to process
        List<Path> files;
        try (Stream<Path> paths = Files.walk(inputDirectory)) {
            files = paths
                .filter(Files::isRegularFile)
                .filter(p -> p.toString().endsWith(".txt"))
                .toList();
        }

        System.out.println("Processing " + files.size() + " files...");

        long startTime = System.currentTimeMillis();
        List<FileResult> results = new ArrayList<>();

        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Process all files concurrently
            List<Future<FileResult>> futures = files.stream()
                .map(file -> executor.submit(() -> processFile(file)))
                .toList();

            // Collect results
            for (Future<FileResult> future : futures) {
                try {
                    results.add(future.get());
                } catch (Exception e) {
                    results.add(FileResult.failure(null, e.getMessage()));
                }
            }
        }

        long duration = System.currentTimeMillis() - startTime;

        return new ProcessingReport(
            results.size(),
            results.stream().filter(FileResult::success).count(),
            results.stream().filter(r -> !r.success()).count(),
            duration,
            results
        );
    }

    private FileResult processFile(Path inputFile) {
        try {
            long startTime = System.currentTimeMillis();

            // Read file
            String content = Files.readString(inputFile);

            // Process content (example: uppercase and add line numbers)
            String processed = processContent(content);

            // Write output
            Path outputFile = outputDirectory.resolve(inputFile.getFileName());
            Files.writeString(outputFile, processed);

            long duration = System.currentTimeMillis() - startTime;

            System.out.println("✓ Processed: " + inputFile.getFileName() + 
                " (" + duration + "ms)");

            return FileResult.success(
                inputFile.getFileName().toString(),
                content.length(),
                processed.length(),
                duration
            );

        } catch (IOException e) {
            System.err.println("✗ Failed: " + inputFile.getFileName() + 
                " - " + e.getMessage());

            return FileResult.failure(
                inputFile.getFileName().toString(),
                e.getMessage()
            );
        }
    }

    private String processContent(String content) {
        // Example processing: add line numbers and convert to uppercase
        String[] lines = content.split("\n");
        StringBuilder result = new StringBuilder();

        for (int i = 0; i < lines.length; i++) {
            result.append(String.format("%4d: %s%n", i + 1, lines[i].toUpperCase()));
        }

        return result.toString();
    }

    record FileResult(
        boolean success,
        String filename,
        long inputSize,
        long outputSize,
        long durationMs,
        String error
    ) {
        static FileResult success(String filename, long inputSize, 
                                 long outputSize, long durationMs) {
            return new FileResult(true, filename, inputSize, outputSize, 
                                durationMs, null);
        }

        static FileResult failure(String filename, String error) {
            return new FileResult(false, filename, 0, 0, 0, error);
        }
    }

    record ProcessingReport(
        long totalFiles,
        long successCount,
        long failureCount,
        long totalDurationMs,
        List<FileResult> results
    ) {
        public void printSummary() {
            System.out.println("\n=== Processing Report ===");
            System.out.println("Total files: " + totalFiles);
            System.out.println("Successful: " + successCount);
            System.out.println("Failed: " + failureCount);
            System.out.println("Total time: " + totalDurationMs + "ms");
            System.out.println("Average time per file: " + 
                (totalDurationMs / totalFiles) + "ms");

            if (failureCount > 0) {
                System.out.println("\nFailed files:");
                results.stream()
                    .filter(r -> !r.success())
                    .forEach(r -> System.out.println("  - " + r.filename() + 
                        ": " + r.error()));
            }
        }
    }

    // Example usage
    public static void main(String[] args) throws Exception {
        ParallelFileProcessor processor = new ParallelFileProcessor(
            Path.of("input"),
            Path.of("output")
        );

        ProcessingReport report = processor.processAll();
        report.printSummary();
    }
}

Pattern 5: Rate-Limited API Client

import java.net.URI;
import java.net.http.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RateLimitedApiClient {
    private final HttpClient httpClient;
    private final int maxRequestsPerSecond;
    private final Semaphore rateLimiter;
    private final ScheduledExecutorService scheduler;
    private final AtomicInteger activeRequests = new AtomicInteger(0);

    public RateLimitedApiClient(int maxRequestsPerSecond) {
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
        this.maxRequestsPerSecond = maxRequestsPerSecond;
        this.rateLimiter = new Semaphore(maxRequestsPerSecond);
        this.scheduler = Executors.newScheduledThreadPool(1);

        // Refill permits every second
        scheduler.scheduleAtFixedRate(
            this::refillPermits,
            1, 1, TimeUnit.SECONDS
        );
    }

    private void refillPermits() {
        int used = maxRequestsPerSecond - rateLimiter.availablePermits();
        if (used > 0) {
            rateLimiter.release(used);
        }
    }

    public List<ApiResponse> fetchBatch(List<String> urls) throws Exception {
        System.out.println("Fetching " + urls.size() + " URLs with rate limit: " + 
            maxRequestsPerSecond + " req/s");

        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<ApiResponse>> futures = urls.stream()
                .map(url -> executor.submit(() -> fetch(url)))
                .toList();

            // Collect results
            List<ApiResponse> responses = new ArrayList<>();
            for (Future<ApiResponse> future : futures) {
                responses.add(future.get());
            }

            return responses;
        }
    }

    private ApiResponse fetch(String url) {
        try {
            // Acquire permit (blocks if rate limit reached)
            rateLimiter.acquire();
            activeRequests.incrementAndGet();

            long startTime = System.currentTimeMillis();

            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(url))
                .timeout(Duration.ofSeconds(10))
                .GET()
                .build();

            HttpResponse<String> response = httpClient.send(
                request,
                HttpResponse.BodyHandlers.ofString()
            );

            long duration = System.currentTimeMillis() - startTime;

            System.out.printf("[%d active] Fetched %s in %dms (status: %d)%n",
                activeRequests.get(), url, duration, response.statusCode());

            return new ApiResponse(
                url,
                response.statusCode(),
                response.body(),
                duration,
                true,
                null
            );

        } catch (Exception e) {
            System.err.println("Failed to fetch " + url + ": " + e.getMessage());
            return new ApiResponse(url, 0, null, 0, false, e.getMessage());

        } finally {
            activeRequests.decrementAndGet();
        }
    }

    public void shutdown() {
        scheduler.shutdown();
    }

    record ApiResponse(
        String url,
        int statusCode,
        String body,
        long durationMs,
        boolean success,
        String error
    ) {}

    // Example usage
    public static void main(String[] args) throws Exception {
        // Create client with 10 requests/second limit
        RateLimitedApiClient client = new RateLimitedApiClient(10);

        // Generate 50 URLs
        List<String> urls = IntStream.range(0, 50)
            .mapToObj(i -> "https://httpbin.org/delay/1")
            .toList();

        long startTime = System.currentTimeMillis();
        List<ApiResponse> responses = client.fetchBatch(urls);
        long totalTime = System.currentTimeMillis() - startTime;

        System.out.println("\n=== Summary ===");
        System.out.println("Total requests: " + responses.size());
        System.out.println("Successful: " + 
            responses.stream().filter(ApiResponse::success).count());
        System.out.println("Failed: " + 
            responses.stream().filter(r -> !r.success()).count());
        System.out.println("Total time: " + totalTime + "ms");
        System.out.println("Expected time (rate limited): ~" + 
            (50 / 10) + " seconds");

        client.shutdown();
    }
}

Best Practices Summary

  1. Use Virtual Threads for I/O-Bound Tasks

    • Web servers handling concurrent requests
    • Database query parallelization
    • Microservice fan-out
    • File processing pipelines
  2. Combine with Structured Concurrency

    • Use StructuredTaskScope for parent-child relationships
    • Ensures all subtasks complete before parent
    • Proper error propagation
  3. Avoid Thread Pools

    • Don't create fixed-size pools of virtual threads
    • Use newVirtualThreadPerTaskExecutor() instead
  4. Handle Errors Properly

    • Catch and handle exceptions in each virtual thread
    • Use try-catch-finally for resource cleanup
    • Consider using Result types for error handling
  5. Monitor Performance

    • Track active virtual threads
    • Monitor pinning events
    • Use JFR for production profiling
  6. Consider Rate Limiting

    • Protect external APIs from overload
    • Use semaphores for rate limiting
    • Consider circuit breakers for resilience