9.4 Real-World Patterns and Applications
Structured concurrency enables elegant solutions to complex concurrency problems. These patterns demonstrate how to apply structured concurrency in production scenarios.
Pattern 1: Multi-Source Data Aggregation
Problem: Fetch data from multiple services, combine results, handle failures gracefully.
public class DataAggregationService {
private final UserService userService;
private final OrderService orderService;
private final RecommendationService recommendationService;
private final AnalyticsService analyticsService;
public record UserProfile(
User user,
List<Order> recentOrders,
List<String> recommendations,
UserAnalytics analytics
) {}
// Parallel aggregation with required vs optional data
public UserProfile getUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Critical data - must succeed
var userTask = scope.fork(() ->
userService.getUser(userId)
);
var ordersTask = scope.fork(() ->
orderService.getRecentOrders(userId, 10)
);
scope.join();
scope.throwIfFailed();
User user = userTask.resultNow();
List<Order> orders = ordersTask.resultNow();
// Optional data - use fallback on failure
List<String> recommendations = getOptionalRecommendations(userId);
UserAnalytics analytics = getOptionalAnalytics(userId);
return new UserProfile(user, orders, recommendations, analytics);
} catch (Exception e) {
logger.error("Failed to build user profile for {}", userId, e);
throw e;
}
}
private List<String> getOptionalRecommendations(String userId) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() ->
recommendationService.getRecommendations(userId)
);
// Short timeout for optional data
Thread watchdog = new Thread(() -> {
try {
Thread.sleep(1000); // 1 second max for recommendations
scope.shutdown();
} catch (InterruptedException e) {
// Completed
}
});
watchdog.setDaemon(true);
watchdog.start();
scope.join();
if (task.hasException() || !task.isDone()) {
return Collections.emptyList(); // Fallback to empty
}
return task.resultNow();
} catch (Exception e) {
logger.warn("Could not fetch recommendations", e);
return Collections.emptyList();
}
}
private UserAnalytics getOptionalAnalytics(String userId) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() ->
analyticsService.getUserAnalytics(userId)
);
scope.join();
return task.hasException() ? UserAnalytics.empty() : task.resultNow();
} catch (Exception e) {
return UserAnalytics.empty();
}
}
// Supporting classes
record User(String id, String name, String email) {}
record Order(String id, LocalDateTime createdAt, BigDecimal total) {}
record UserAnalytics(int totalOrders, BigDecimal totalSpent, LocalDate lastOrderDate) {
static UserAnalytics empty() {
return new UserAnalytics(0, BigDecimal.ZERO, null);
}
}
interface UserService {
User getUser(String userId) throws Exception;
}
interface OrderService {
List<Order> getRecentOrders(String userId, int limit) throws Exception;
}
interface RecommendationService {
List<String> getRecommendations(String userId) throws Exception;
}
interface AnalyticsService {
UserAnalytics getUserAnalytics(String userId) throws Exception;
}
}
Pattern 2: Load Balancing with Failover
Problem: Distribute requests across multiple backends, failover on error, use fastest responder.
public class LoadBalancingService {
private final List<BackendService> backends;
private final Logger logger = LoggerFactory.getLogger(LoadBalancingService.class);
public LoadBalancingService(List<BackendService> backends) {
this.backends = backends;
}
// Race-to-complete pattern with fallback
public <T> T executeWithLoadBalancing(Function<BackendService, T> operation)
throws Exception {
if (backends.isEmpty()) {
throw new IllegalStateException("No backends available");
}
// First try: race the fastest backend
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
List<var> tasks = new ArrayList<>();
for (BackendService backend : backends) {
var task = scope.fork(() -> {
try {
T result = operation.apply(backend);
logger.info("Backend {} succeeded", backend.getName());
return result;
} catch (Exception e) {
logger.debug("Backend {} failed", backend.getName(), e);
throw e;
}
});
tasks.add(task);
}
try {
scope.join();
T result = scope.result();
logger.info("Request completed successfully");
return result;
} catch (Exception e) {
// All backends failed
logger.error("All backends failed for operation", e);
throw new ServiceUnavailableException("All backends down");
}
}
}
// Sticky session with automatic failover
public <T> T executeWithPrimaryFailover(
Function<BackendService, T> operation,
int primaryIndex) throws Exception {
BackendService primary = backends.get(primaryIndex);
List<BackendService> secondaries = new ArrayList<>(backends);
secondaries.remove(primaryIndex);
try {
// Try primary first with timeout
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> operation.apply(primary));
Thread watchdog = new Thread(() -> {
try {
Thread.sleep(2000); // 2 second timeout for primary
scope.shutdown();
} catch (InterruptedException e) {
// Completed
}
});
watchdog.setDaemon(true);
watchdog.start();
scope.join();
scope.throwIfFailed();
logger.info("Primary {} succeeded", primary.getName());
return task.resultNow();
}
} catch (Exception e) {
logger.warn("Primary {} failed, using secondary", primary.getName(), e);
// Try secondaries in order
for (BackendService secondary : secondaries) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> operation.apply(secondary));
scope.join();
scope.throwIfFailed();
logger.info("Secondary {} succeeded", secondary.getName());
return task.resultNow();
}
}
throw new ServiceUnavailableException("All backends unavailable");
}
}
public interface BackendService {
String getName();
<T> T execute(Function<?, T> operation) throws Exception;
}
}
Pattern 3: Parallel Batch Processing
Problem: Process items in parallel batches, maintain ordering, handle partial failures.
public class BatchProcessor<T, R> {
private final int batchSize;
private final Function<List<T>, List<R>> processor;
private final int maxRetries;
public BatchProcessor(int batchSize,
Function<List<T>, List<R>> processor,
int maxRetries) {
this.batchSize = batchSize;
this.processor = processor;
this.maxRetries = maxRetries;
}
public List<R> processAll(List<T> items) throws Exception {
List<R> results = Collections.synchronizedList(new ArrayList<>());
List<Integer> failedBatches = Collections.synchronizedList(new ArrayList<>());
// Create batches
List<List<T>> batches = createBatches(items);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
int batchIndex = 0;
for (List<T> batch : batches) {
int currentBatch = batchIndex++;
scope.fork(() -> {
System.out.println("Processing batch " + currentBatch);
List<R> batchResults;
try {
batchResults = retryBatch(batch, maxRetries);
} catch (Exception e) {
System.err.println("Batch " + currentBatch + " failed after retries");
failedBatches.add(currentBatch);
throw e;
}
// Maintain order
synchronized (results) {
results.addAll(currentBatch * batchSize, batchResults);
}
return null;
});
}
scope.join();
if (!failedBatches.isEmpty()) {
throw new BatchProcessingException(
"Failed batches: " + failedBatches
);
}
} catch (Exception e) {
System.err.println("Batch processing failed: " + e.getMessage());
throw e;
}
return results;
}
private List<R> retryBatch(List<T> batch, int maxRetries)
throws Exception {
Exception lastException = null;
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
return processor.apply(batch);
} catch (Exception e) {
lastException = e;
System.out.println("Batch attempt " + (attempt + 1) +
" failed, retrying...");
if (attempt < maxRetries) {
Thread.sleep(100 * (attempt + 1)); // Exponential backoff
}
}
}
throw lastException;
}
private List<List<T>> createBatches(List<T> items) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < items.size(); i += batchSize) {
int end = Math.min(i + batchSize, items.size());
batches.add(items.subList(i, end));
}
return batches;
}
public static class BatchProcessingException extends Exception {
public BatchProcessingException(String message) {
super(message);
}
}
}
// Usage example
public class BatchProcessingExample {
public static void main(String[] args) throws Exception {
List<Integer> items = IntStream.range(0, 1000)
.boxed()
.collect(Collectors.toList());
BatchProcessor<Integer, String> processor = new BatchProcessor<>(
100, // batch size
batch -> batch.stream()
.map(i -> "Item-" + i)
.collect(Collectors.toList()),
2 // max retries
);
List<String> results = processor.processAll(items);
System.out.println("Processed " + results.size() + " items");
}
}
Pattern 4: Circuit Breaker with Structured Concurrency
Problem: Prevent cascading failures by wrapping requests with timeout and fallback.
public class CircuitBreaker<T> {
private final Supplier<T> operation;
private final Supplier<T> fallback;
private final Duration timeout;
private final int failureThreshold;
private volatile int failureCount = 0;
private volatile State state = State.CLOSED;
enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(
Supplier<T> operation,
Supplier<T> fallback,
Duration timeout,
int failureThreshold) {
this.operation = operation;
this.fallback = fallback;
this.timeout = timeout;
this.failureThreshold = failureThreshold;
}
public T execute() throws Exception {
if (state == State.OPEN) {
System.out.println("Circuit OPEN - using fallback");
return fallback.get();
}
try {
T result = executeWithTimeout(operation, timeout);
// Success - reset failure count
if (state == State.HALF_OPEN) {
System.out.println("Circuit CLOSED - operation recovered");
state = State.CLOSED;
}
failureCount = 0;
return result;
} catch (Exception e) {
failureCount++;
System.err.println("Operation failed (" + failureCount +
"/" + failureThreshold + ")");
if (failureCount >= failureThreshold) {
state = State.OPEN;
System.err.println("Circuit OPEN - too many failures");
scheduleHalfOpenAfterDelay();
}
// Use fallback
System.out.println("Using fallback due to error: " + e.getMessage());
return fallback.get();
}
}
private <R> R executeWithTimeout(Supplier<R> supplier, Duration timeout)
throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(supplier::get);
Thread watchdog = new Thread(() -> {
try {
Thread.sleep(timeout.toMillis());
scope.shutdown();
} catch (InterruptedException e) {
// Completed
}
});
watchdog.setDaemon(true);
watchdog.start();
scope.join();
if (task.hasException()) {
throw task.exception();
}
return task.resultNow();
}
}
private void scheduleHalfOpenAfterDelay() {
Thread scheduler = new Thread(() -> {
try {
Thread.sleep(5000); // Wait 5 seconds before trying again
if (state == State.OPEN) {
state = State.HALF_OPEN;
System.out.println("Circuit HALF_OPEN - allowing test request");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
scheduler.setDaemon(true);
scheduler.start();
}
}
// Usage
public class CircuitBreakerExample {
public static void main(String[] args) throws Exception {
CircuitBreaker<String> breaker = new CircuitBreaker<>(
() -> {
// This might fail
return unstableRemoteService.fetchData();
},
() -> {
// Fallback
return "Cached data";
},
Duration.ofSeconds(2), // 2 second timeout
3 // Open circuit after 3 failures
);
// Multiple requests
for (int i = 0; i < 10; i++) {
try {
String data = breaker.execute();
System.out.println("Got: " + data);
} catch (Exception e) {
System.err.println("Request failed: " + e.getMessage());
}
Thread.sleep(500);
}
}
}
Pattern 5: Hierarchical Task Composition
Problem: Build complex operations from simpler ones, maintain proper task hierarchy.
public class OrderFulfillmentService {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final NotificationService notificationService;
public record Order(String id, List<Item> items, String shippingAddress) {}
public record Item(String productId, int quantity) {}
public void fulfillOrder(Order order) throws Exception {
System.out.println("=== Fulfilling order " + order.id() + " ===");
// Level 1: Main orchestration
try (var mainScope = new StructuredTaskScope.ShutdownOnFailure()) {
// Process payment
var paymentTask = mainScope.fork(() -> {
System.out.println("Level 2: Processing payment");
return processPayment(order);
});
// Allocate inventory
var inventoryTask = mainScope.fork(() -> {
System.out.println("Level 2: Allocating inventory");
return allocateInventory(order);
});
mainScope.join();
mainScope.throwIfFailed();
String paymentId = paymentTask.resultNow();
String reservationId = inventoryTask.resultNow();
System.out.println("Level 2: Payment and inventory complete");
// Now arrange shipping (depends on successful payment + inventory)
String shipmentId = arrangeShipping(order, reservationId);
// Send confirmations (best-effort)
sendConfirmations(order, paymentId, reservationId, shipmentId);
System.out.println("Order " + order.id() + " fulfilled successfully");
} catch (Exception e) {
System.err.println("Order fulfillment failed: " + e.getMessage());
// Cleanup on failure
throw e;
}
}
private String processPayment(Order order) throws Exception {
// Level 3: Payment sub-operations
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var authTask = scope.fork(() -> {
System.out.println(" Level 3: Authorizing payment");
return authorizePayment(order);
});
var fraudCheckTask = scope.fork(() -> {
System.out.println(" Level 3: Checking fraud");
return checkFraud(order);
});
scope.join();
scope.throwIfFailed();
String authId = authTask.resultNow();
boolean isFraudulent = fraudCheckTask.resultNow();
if (isFraudulent) {
throw new IllegalStateException("Fraud detected");
}
return authId;
}
}
private String allocateInventory(Order order) throws Exception {
// Level 3: Inventory sub-operations
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<var> reservationTasks = new ArrayList<>();
for (Item item : order.items()) {
var task = scope.fork(() -> {
System.out.println(" Level 3: Reserving " +
item.quantity() + " of " + item.productId());
return reserveItem(item);
});
reservationTasks.add(task);
}
scope.join();
scope.throwIfFailed();
return "RESERVATION-" + UUID.randomUUID();
}
}
private String arrangeShipping(Order order, String reservationId) throws Exception {
System.out.println("Level 2: Arranging shipping");
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var pickTask = scope.fork(() -> {
System.out.println(" Level 3: Picking items");
return pickItems(reservationId);
});
var labelTask = scope.fork(() -> {
System.out.println(" Level 3: Creating shipping label");
return createShippingLabel(order.shippingAddress());
});
scope.join();
scope.throwIfFailed();
String pickId = pickTask.resultNow();
String labelId = labelTask.resultNow();
System.out.println(" Level 3: Scheduling pickup");
return schedulePickup(pickId, labelId);
}
}
private void sendConfirmations(Order order, String paymentId,
String reservationId, String shipmentId) {
// Non-critical - don't fail order if notifications fail
try (var scope = new StructuredTaskScope<Void>()) {
scope.fork(() -> {
notificationService.sendPaymentConfirmation(order, paymentId);
return null;
});
scope.fork(() -> {
notificationService.sendReservationConfirmation(order, reservationId);
return null;
});
scope.fork(() -> {
notificationService.sendShipmentConfirmation(order, shipmentId);
return null;
});
scope.join();
} catch (Exception e) {
System.err.println("Notification error (non-critical): " + e.getMessage());
}
}
// Helper methods
private String authorizePayment(Order order) { return "AUTH-" + UUID.randomUUID(); }
private boolean checkFraud(Order order) { return false; }
private String reserveItem(Item item) { return "RES-" + item.productId(); }
private String pickItems(String reservationId) { return "PICK-" + UUID.randomUUID(); }
private String createShippingLabel(String address) { return "LABEL-" + UUID.randomUUID(); }
private String schedulePickup(String pickId, String labelId) { return "SHIP-" + UUID.randomUUID(); }
// Service interfaces
interface PaymentService {}
interface InventoryService {}
interface ShippingService {}
interface NotificationService {
void sendPaymentConfirmation(Order order, String paymentId) throws Exception;
void sendReservationConfirmation(Order order, String reservationId) throws Exception;
void sendShipmentConfirmation(Order order, String shipmentId) throws Exception;
}
}
Best Practices Summary
1. Match Scope Policy to Requirements
ShutdownOnFailure: All-or-nothing operationsShutdownOnSuccess: Racing for first result- Custom: Complex scenarios requiring partial success
2. Always Use Try-With-Resources
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Guaranteed cleanup
}
3. Handle Errors Appropriately
scope.join();
try {
scope.throwIfFailed();
} catch (Exception e) {
// Handle aggregated errors
}
4. Design for Composability
// Nested scopes maintain proper hierarchy
try (var outer = new StructuredTaskScope.ShutdownOnFailure()) {
outer.fork(() -> {
try (var inner = new StructuredTaskScope.ShutdownOnFailure()) {
// Structured hierarchy preserved
}
});
}
5. Monitor and Log
scope.fork(() -> {
try {
T result = operation.get();
logger.info("Operation succeeded");
return result;
} catch (Exception e) {
logger.error("Operation failed", e);
throw e;
}
});