8.4 Real-World Applications and Patterns
This section explores practical applications of virtual threads with complete, production-ready examples.
Pattern 1: High-Throughput Web Server
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
public class VirtualThreadWebServer {
private final int port;
private final RequestHandler requestHandler;
private volatile boolean running = true;
public VirtualThreadWebServer(int port, RequestHandler requestHandler) {
this.port = port;
this.requestHandler = requestHandler;
}
public void start() throws IOException {
try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(true);
System.out.println("Server started on port " + port);
while (running) {
// Accept connection (blocking is fine)
SocketChannel client = serverSocket.accept();
// Spawn virtual thread per request
Thread.startVirtualThread(() -> handleClient(client));
}
}
}
private void handleClient(SocketChannel client) {
try {
// Read request
HttpRequest request = readRequest(client);
// Process request (may involve database, external APIs, etc.)
HttpResponse response = requestHandler.handle(request);
// Send response
writeResponse(client, response);
} catch (Exception e) {
System.err.println("Error handling client: " + e.getMessage());
sendError(client, 500, "Internal Server Error");
} finally {
try {
client.close();
} catch (IOException e) {
// Ignore close errors
}
}
}
private HttpRequest readRequest(SocketChannel client) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(8192);
StringBuilder requestBuilder = new StringBuilder();
while (client.read(buffer) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
requestBuilder.append((char) buffer.get());
}
buffer.clear();
// Check if we've read complete headers
String request = requestBuilder.toString();
if (request.contains("\r\n\r\n")) {
break;
}
}
return HttpRequest.parse(requestBuilder.toString());
}
private void writeResponse(SocketChannel client, HttpResponse response) throws IOException {
String responseStr = String.format(
"HTTP/1.1 %d %s\r\n" +
"Content-Type: %s\r\n" +
"Content-Length: %d\r\n" +
"\r\n" +
"%s",
response.statusCode(),
response.statusText(),
response.contentType(),
response.body().length(),
response.body()
);
client.write(ByteBuffer.wrap(responseStr.getBytes()));
}
private void sendError(SocketChannel client, int statusCode, String message) {
try {
writeResponse(client, HttpResponse.error(statusCode, message));
} catch (IOException e) {
// Ignore
}
}
public void stop() {
running = false;
}
// Supporting classes
record HttpRequest(String method, String path, String headers, String body) {
static HttpRequest parse(String requestStr) {
String[] lines = requestStr.split("\r\n");
String[] requestLine = lines[0].split(" ");
return new HttpRequest(
requestLine[0],
requestLine[1],
requestStr.substring(0, requestStr.indexOf("\r\n\r\n")),
requestStr.substring(requestStr.indexOf("\r\n\r\n") + 4)
);
}
}
record HttpResponse(int statusCode, String statusText, String contentType, String body) {
static HttpResponse ok(String body) {
return new HttpResponse(200, "OK", "text/plain", body);
}
static HttpResponse json(String json) {
return new HttpResponse(200, "OK", "application/json", json);
}
static HttpResponse error(int statusCode, String message) {
return new HttpResponse(statusCode, getStatusText(statusCode), "text/plain", message);
}
private static String getStatusText(int statusCode) {
return switch (statusCode) {
case 200 -> "OK";
case 404 -> "Not Found";
case 500 -> "Internal Server Error";
default -> "Error";
};
}
}
interface RequestHandler {
HttpResponse handle(HttpRequest request);
}
// Example usage
public static void main(String[] args) throws IOException {
RequestHandler handler = request -> {
// Simulate database query or external API call
try {
Thread.sleep(100); // Virtual thread parks efficiently
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return HttpResponse.ok("Hello from virtual thread: " +
Thread.currentThread().getName());
};
VirtualThreadWebServer server = new VirtualThreadWebServer(8080, handler);
server.start();
}
}
Pattern 2: Database Connection Pool with Virtual Threads
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
public class VirtualThreadDatabaseService {
private final DataSource dataSource;
public VirtualThreadDatabaseService(DataSource dataSource) {
this.dataSource = dataSource;
}
// Process multiple queries concurrently
public Map<String, User> fetchUsers(List<String> userIds) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Submit all queries concurrently
Map<String, Future<User>> futures = new ConcurrentHashMap<>();
for (String userId : userIds) {
futures.put(userId, executor.submit(() -> fetchUser(userId)));
}
// Collect results
Map<String, User> users = new ConcurrentHashMap<>();
for (Map.Entry<String, Future<User>> entry : futures.entrySet()) {
User user = entry.getValue().get();
if (user != null) {
users.put(entry.getKey(), user);
}
}
return users;
}
}
private User fetchUser(String userId) {
// Each virtual thread gets its own connection
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT id, name, email FROM users WHERE id = ?"
)) {
stmt.setString(1, userId);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return new User(
rs.getString("id"),
rs.getString("name"),
rs.getString("email")
);
}
return null;
} catch (SQLException e) {
throw new RuntimeException("Failed to fetch user: " + userId, e);
}
}
// Batch insert with virtual threads
public void insertUsers(List<User> users) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Insert in batches for better performance
int batchSize = 100;
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < users.size(); i += batchSize) {
int end = Math.min(i + batchSize, users.size());
List<User> batch = users.subList(i, end);
futures.add(executor.submit(() -> {
insertBatch(batch);
return null;
}));
}
// Wait for all batches
for (Future<Void> future : futures) {
future.get();
}
}
}
private void insertBatch(List<User> users) {
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try (PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)")) {
for (User user : users) {
stmt.setString(1, user.id());
stmt.setString(2, user.name());
stmt.setString(3, user.email());
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
conn.rollback();
throw e;
}
} catch (SQLException e) {
throw new RuntimeException("Failed to insert batch", e);
}
}
// Aggregate data from multiple tables
public UserProfile fetchUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fetch from multiple tables concurrently
var userTask = scope.fork(() -> fetchUser(userId));
var ordersTask = scope.fork(() -> fetchUserOrders(userId));
var preferencesTask = scope.fork(() -> fetchUserPreferences(userId));
scope.join();
scope.throwIfFailed();
return new UserProfile(
userTask.get(),
ordersTask.get(),
preferencesTask.get()
);
}
}
private List<Order> fetchUserOrders(String userId) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT id, total, status FROM orders WHERE user_id = ?"
)) {
stmt.setString(1, userId);
ResultSet rs = stmt.executeQuery();
List<Order> orders = new ArrayList<>();
while (rs.next()) {
orders.add(new Order(
rs.getString("id"),
rs.getBigDecimal("total"),
rs.getString("status")
));
}
return orders;
} catch (SQLException e) {
throw new RuntimeException("Failed to fetch orders", e);
}
}
private Map<String, String> fetchUserPreferences(String userId) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT key, value FROM user_preferences WHERE user_id = ?"
)) {
stmt.setString(1, userId);
ResultSet rs = stmt.executeQuery();
Map<String, String> prefs = new HashMap<>();
while (rs.next()) {
prefs.put(rs.getString("key"), rs.getString("value"));
}
return prefs;
} catch (SQLException e) {
throw new RuntimeException("Failed to fetch preferences", e);
}
}
record User(String id, String name, String email) {}
record Order(String id, java.math.BigDecimal total, String status) {}
record UserProfile(User user, List<Order> orders, Map<String, String> preferences) {}
interface DataSource {
Connection getConnection() throws SQLException;
}
}
Pattern 3: Microservices Fan-Out
import java.net.URI;
import java.net.http.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class MicroserviceAggregator {
private final HttpClient httpClient;
private final Map<String, URI> serviceUrls;
public MicroserviceAggregator(Map<String, URI> serviceUrls) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
this.serviceUrls = serviceUrls;
}
// Aggregate data from multiple microservices
public DashboardData fetchDashboard(String userId) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Fan out to all services concurrently
Future<UserInfo> userInfoFuture = executor.submit(() ->
fetchUserInfo(userId));
Future<List<Order>> ordersFuture = executor.submit(() ->
fetchOrders(userId));
Future<List<Notification>> notificationsFuture = executor.submit(() ->
fetchNotifications(userId));
Future<AccountStats> statsFuture = executor.submit(() ->
fetchAccountStats(userId));
Future<List<Recommendation>> recommendationsFuture = executor.submit(() ->
fetchRecommendations(userId));
// Wait for all with timeout
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getFuture(userInfoFuture)),
CompletableFuture.supplyAsync(() -> getFuture(ordersFuture)),
CompletableFuture.supplyAsync(() -> getFuture(notificationsFuture)),
CompletableFuture.supplyAsync(() -> getFuture(statsFuture)),
CompletableFuture.supplyAsync(() -> getFuture(recommendationsFuture))
);
try {
allFutures.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Some services might be slow, continue with what we have
System.err.println("Some services timed out");
}
return new DashboardData(
getFutureOrDefault(userInfoFuture, null),
getFutureOrDefault(ordersFuture, List.of()),
getFutureOrDefault(notificationsFuture, List.of()),
getFutureOrDefault(statsFuture, null),
getFutureOrDefault(recommendationsFuture, List.of())
);
}
}
private UserInfo fetchUserInfo(String userId) {
return callService(
serviceUrls.get("user-service"),
"/users/" + userId,
UserInfo.class
);
}
private List<Order> fetchOrders(String userId) {
return callServiceList(
serviceUrls.get("order-service"),
"/orders?userId=" + userId,
Order.class
);
}
private List<Notification> fetchNotifications(String userId) {
return callServiceList(
serviceUrls.get("notification-service"),
"/notifications?userId=" + userId,
Notification.class
);
}
private AccountStats fetchAccountStats(String userId) {
return callService(
serviceUrls.get("analytics-service"),
"/stats/" + userId,
AccountStats.class
);
}
private List<Recommendation> fetchRecommendations(String userId) {
return callServiceList(
serviceUrls.get("recommendation-service"),
"/recommendations?userId=" + userId,
Recommendation.class
);
}
private <T> T callService(URI baseUri, String path, Class<T> responseType) {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUri + path))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);
if (response.statusCode() != 200) {
throw new RuntimeException("Service returned: " + response.statusCode());
}
// Parse JSON response (simplified - use Jackson in production)
return parseJson(response.body(), responseType);
} catch (Exception e) {
throw new RuntimeException("Failed to call service: " + baseUri + path, e);
}
}
private <T> List<T> callServiceList(URI baseUri, String path, Class<T> itemType) {
// Simplified - in production, properly parse JSON array
T item = callService(baseUri, path, itemType);
return List.of(item);
}
private <T> T parseJson(String json, Class<T> type) {
// Simplified - use Jackson ObjectMapper in production
return null;
}
private <T> T getFuture(Future<T> future) {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private <T> T getFutureOrDefault(Future<T> future, T defaultValue) {
try {
if (future.isDone()) {
return future.get();
}
} catch (Exception e) {
// Ignore errors, return default
}
return defaultValue;
}
record DashboardData(
UserInfo userInfo,
List<Order> orders,
List<Notification> notifications,
AccountStats stats,
List<Recommendation> recommendations
) {}
record UserInfo(String id, String name, String email) {}
record Order(String id, String status, double total) {}
record Notification(String id, String message, boolean read) {}
record AccountStats(int totalOrders, double totalSpent, int points) {}
record Recommendation(String id, String title, String category) {}
}
Pattern 4: Parallel File Processing
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelFileProcessor {
private final Path inputDirectory;
private final Path outputDirectory;
public ParallelFileProcessor(Path inputDirectory, Path outputDirectory) {
this.inputDirectory = inputDirectory;
this.outputDirectory = outputDirectory;
}
public ProcessingReport processAll() throws Exception {
// Create output directory
Files.createDirectories(outputDirectory);
// Find all files to process
List<Path> files;
try (Stream<Path> paths = Files.walk(inputDirectory)) {
files = paths
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".txt"))
.toList();
}
System.out.println("Processing " + files.size() + " files...");
long startTime = System.currentTimeMillis();
List<FileResult> results = new ArrayList<>();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Process all files concurrently
List<Future<FileResult>> futures = files.stream()
.map(file -> executor.submit(() -> processFile(file)))
.toList();
// Collect results
for (Future<FileResult> future : futures) {
try {
results.add(future.get());
} catch (Exception e) {
results.add(FileResult.failure(null, e.getMessage()));
}
}
}
long duration = System.currentTimeMillis() - startTime;
return new ProcessingReport(
results.size(),
results.stream().filter(FileResult::success).count(),
results.stream().filter(r -> !r.success()).count(),
duration,
results
);
}
private FileResult processFile(Path inputFile) {
try {
long startTime = System.currentTimeMillis();
// Read file
String content = Files.readString(inputFile);
// Process content (example: uppercase and add line numbers)
String processed = processContent(content);
// Write output
Path outputFile = outputDirectory.resolve(inputFile.getFileName());
Files.writeString(outputFile, processed);
long duration = System.currentTimeMillis() - startTime;
System.out.println("✓ Processed: " + inputFile.getFileName() +
" (" + duration + "ms)");
return FileResult.success(
inputFile.getFileName().toString(),
content.length(),
processed.length(),
duration
);
} catch (IOException e) {
System.err.println("✗ Failed: " + inputFile.getFileName() +
" - " + e.getMessage());
return FileResult.failure(
inputFile.getFileName().toString(),
e.getMessage()
);
}
}
private String processContent(String content) {
// Example processing: add line numbers and convert to uppercase
String[] lines = content.split("\n");
StringBuilder result = new StringBuilder();
for (int i = 0; i < lines.length; i++) {
result.append(String.format("%4d: %s%n", i + 1, lines[i].toUpperCase()));
}
return result.toString();
}
record FileResult(
boolean success,
String filename,
long inputSize,
long outputSize,
long durationMs,
String error
) {
static FileResult success(String filename, long inputSize,
long outputSize, long durationMs) {
return new FileResult(true, filename, inputSize, outputSize,
durationMs, null);
}
static FileResult failure(String filename, String error) {
return new FileResult(false, filename, 0, 0, 0, error);
}
}
record ProcessingReport(
long totalFiles,
long successCount,
long failureCount,
long totalDurationMs,
List<FileResult> results
) {
public void printSummary() {
System.out.println("\n=== Processing Report ===");
System.out.println("Total files: " + totalFiles);
System.out.println("Successful: " + successCount);
System.out.println("Failed: " + failureCount);
System.out.println("Total time: " + totalDurationMs + "ms");
System.out.println("Average time per file: " +
(totalDurationMs / totalFiles) + "ms");
if (failureCount > 0) {
System.out.println("\nFailed files:");
results.stream()
.filter(r -> !r.success())
.forEach(r -> System.out.println(" - " + r.filename() +
": " + r.error()));
}
}
}
// Example usage
public static void main(String[] args) throws Exception {
ParallelFileProcessor processor = new ParallelFileProcessor(
Path.of("input"),
Path.of("output")
);
ProcessingReport report = processor.processAll();
report.printSummary();
}
}
Pattern 5: Rate-Limited API Client
import java.net.URI;
import java.net.http.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RateLimitedApiClient {
private final HttpClient httpClient;
private final int maxRequestsPerSecond;
private final Semaphore rateLimiter;
private final ScheduledExecutorService scheduler;
private final AtomicInteger activeRequests = new AtomicInteger(0);
public RateLimitedApiClient(int maxRequestsPerSecond) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
this.maxRequestsPerSecond = maxRequestsPerSecond;
this.rateLimiter = new Semaphore(maxRequestsPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
// Refill permits every second
scheduler.scheduleAtFixedRate(
this::refillPermits,
1, 1, TimeUnit.SECONDS
);
}
private void refillPermits() {
int used = maxRequestsPerSecond - rateLimiter.availablePermits();
if (used > 0) {
rateLimiter.release(used);
}
}
public List<ApiResponse> fetchBatch(List<String> urls) throws Exception {
System.out.println("Fetching " + urls.size() + " URLs with rate limit: " +
maxRequestsPerSecond + " req/s");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<ApiResponse>> futures = urls.stream()
.map(url -> executor.submit(() -> fetch(url)))
.toList();
// Collect results
List<ApiResponse> responses = new ArrayList<>();
for (Future<ApiResponse> future : futures) {
responses.add(future.get());
}
return responses;
}
}
private ApiResponse fetch(String url) {
try {
// Acquire permit (blocks if rate limit reached)
rateLimiter.acquire();
activeRequests.incrementAndGet();
long startTime = System.currentTimeMillis();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(10))
.GET()
.build();
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);
long duration = System.currentTimeMillis() - startTime;
System.out.printf("[%d active] Fetched %s in %dms (status: %d)%n",
activeRequests.get(), url, duration, response.statusCode());
return new ApiResponse(
url,
response.statusCode(),
response.body(),
duration,
true,
null
);
} catch (Exception e) {
System.err.println("Failed to fetch " + url + ": " + e.getMessage());
return new ApiResponse(url, 0, null, 0, false, e.getMessage());
} finally {
activeRequests.decrementAndGet();
}
}
public void shutdown() {
scheduler.shutdown();
}
record ApiResponse(
String url,
int statusCode,
String body,
long durationMs,
boolean success,
String error
) {}
// Example usage
public static void main(String[] args) throws Exception {
// Create client with 10 requests/second limit
RateLimitedApiClient client = new RateLimitedApiClient(10);
// Generate 50 URLs
List<String> urls = IntStream.range(0, 50)
.mapToObj(i -> "https://httpbin.org/delay/1")
.toList();
long startTime = System.currentTimeMillis();
List<ApiResponse> responses = client.fetchBatch(urls);
long totalTime = System.currentTimeMillis() - startTime;
System.out.println("\n=== Summary ===");
System.out.println("Total requests: " + responses.size());
System.out.println("Successful: " +
responses.stream().filter(ApiResponse::success).count());
System.out.println("Failed: " +
responses.stream().filter(r -> !r.success()).count());
System.out.println("Total time: " + totalTime + "ms");
System.out.println("Expected time (rate limited): ~" +
(50 / 10) + " seconds");
client.shutdown();
}
}
Best Practices Summary
Use Virtual Threads for I/O-Bound Tasks
- Web servers handling concurrent requests
- Database query parallelization
- Microservice fan-out
- File processing pipelines
Combine with Structured Concurrency
- Use
StructuredTaskScopefor parent-child relationships - Ensures all subtasks complete before parent
- Proper error propagation
- Use
Avoid Thread Pools
- Don't create fixed-size pools of virtual threads
- Use
newVirtualThreadPerTaskExecutor()instead
Handle Errors Properly
- Catch and handle exceptions in each virtual thread
- Use try-catch-finally for resource cleanup
- Consider using
Resulttypes for error handling
Monitor Performance
- Track active virtual threads
- Monitor pinning events
- Use JFR for production profiling
Consider Rate Limiting
- Protect external APIs from overload
- Use semaphores for rate limiting
- Consider circuit breakers for resilience