9.4 Real-World Patterns and Applications

Structured concurrency enables elegant solutions to complex concurrency problems. These patterns demonstrate how to apply structured concurrency in production scenarios.

Pattern 1: Multi-Source Data Aggregation

Problem: Fetch data from multiple services, combine results, handle failures gracefully.

public class DataAggregationService {
    private final UserService userService;
    private final OrderService orderService;
    private final RecommendationService recommendationService;
    private final AnalyticsService analyticsService;

    public record UserProfile(
        User user,
        List<Order> recentOrders,
        List<String> recommendations,
        UserAnalytics analytics
    ) {}

    // Parallel aggregation with required vs optional data
    public UserProfile getUserProfile(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Critical data - must succeed
            var userTask = scope.fork(() -> 
                userService.getUser(userId)
            );

            var ordersTask = scope.fork(() -> 
                orderService.getRecentOrders(userId, 10)
            );

            scope.join();
            scope.throwIfFailed();

            User user = userTask.resultNow();
            List<Order> orders = ordersTask.resultNow();

            // Optional data - use fallback on failure
            List<String> recommendations = getOptionalRecommendations(userId);
            UserAnalytics analytics = getOptionalAnalytics(userId);

            return new UserProfile(user, orders, recommendations, analytics);

        } catch (Exception e) {
            logger.error("Failed to build user profile for {}", userId, e);
            throw e;
        }
    }

    private List<String> getOptionalRecommendations(String userId) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task = scope.fork(() -> 
                recommendationService.getRecommendations(userId)
            );

            // Short timeout for optional data
            Thread watchdog = new Thread(() -> {
                try {
                    Thread.sleep(1000);  // 1 second max for recommendations
                    scope.shutdown();
                } catch (InterruptedException e) {
                    // Completed
                }
            });
            watchdog.setDaemon(true);
            watchdog.start();

            scope.join();

            if (task.hasException() || !task.isDone()) {
                return Collections.emptyList();  // Fallback to empty
            }
            return task.resultNow();

        } catch (Exception e) {
            logger.warn("Could not fetch recommendations", e);
            return Collections.emptyList();
        }
    }

    private UserAnalytics getOptionalAnalytics(String userId) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task = scope.fork(() -> 
                analyticsService.getUserAnalytics(userId)
            );

            scope.join();
            return task.hasException() ? UserAnalytics.empty() : task.resultNow();

        } catch (Exception e) {
            return UserAnalytics.empty();
        }
    }

    // Supporting classes
    record User(String id, String name, String email) {}
    record Order(String id, LocalDateTime createdAt, BigDecimal total) {}
    record UserAnalytics(int totalOrders, BigDecimal totalSpent, LocalDate lastOrderDate) {
        static UserAnalytics empty() {
            return new UserAnalytics(0, BigDecimal.ZERO, null);
        }
    }

    interface UserService {
        User getUser(String userId) throws Exception;
    }
    interface OrderService {
        List<Order> getRecentOrders(String userId, int limit) throws Exception;
    }
    interface RecommendationService {
        List<String> getRecommendations(String userId) throws Exception;
    }
    interface AnalyticsService {
        UserAnalytics getUserAnalytics(String userId) throws Exception;
    }
}

Pattern 2: Load Balancing with Failover

Problem: Distribute requests across multiple backends, failover on error, use fastest responder.

public class LoadBalancingService {
    private final List<BackendService> backends;
    private final Logger logger = LoggerFactory.getLogger(LoadBalancingService.class);

    public LoadBalancingService(List<BackendService> backends) {
        this.backends = backends;
    }

    // Race-to-complete pattern with fallback
    public <T> T executeWithLoadBalancing(Function<BackendService, T> operation) 
            throws Exception {

        if (backends.isEmpty()) {
            throw new IllegalStateException("No backends available");
        }

        // First try: race the fastest backend
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
            List<var> tasks = new ArrayList<>();

            for (BackendService backend : backends) {
                var task = scope.fork(() -> {
                    try {
                        T result = operation.apply(backend);
                        logger.info("Backend {} succeeded", backend.getName());
                        return result;
                    } catch (Exception e) {
                        logger.debug("Backend {} failed", backend.getName(), e);
                        throw e;
                    }
                });
                tasks.add(task);
            }

            try {
                scope.join();
                T result = scope.result();
                logger.info("Request completed successfully");
                return result;

            } catch (Exception e) {
                // All backends failed
                logger.error("All backends failed for operation", e);
                throw new ServiceUnavailableException("All backends down");
            }
        }
    }

    // Sticky session with automatic failover
    public <T> T executeWithPrimaryFailover(
            Function<BackendService, T> operation,
            int primaryIndex) throws Exception {

        BackendService primary = backends.get(primaryIndex);
        List<BackendService> secondaries = new ArrayList<>(backends);
        secondaries.remove(primaryIndex);

        try {
            // Try primary first with timeout
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                var task = scope.fork(() -> operation.apply(primary));

                Thread watchdog = new Thread(() -> {
                    try {
                        Thread.sleep(2000);  // 2 second timeout for primary
                        scope.shutdown();
                    } catch (InterruptedException e) {
                        // Completed
                    }
                });
                watchdog.setDaemon(true);
                watchdog.start();

                scope.join();
                scope.throwIfFailed();

                logger.info("Primary {} succeeded", primary.getName());
                return task.resultNow();
            }

        } catch (Exception e) {
            logger.warn("Primary {} failed, using secondary", primary.getName(), e);

            // Try secondaries in order
            for (BackendService secondary : secondaries) {
                try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                    var task = scope.fork(() -> operation.apply(secondary));
                    scope.join();
                    scope.throwIfFailed();

                    logger.info("Secondary {} succeeded", secondary.getName());
                    return task.resultNow();
                }
            }

            throw new ServiceUnavailableException("All backends unavailable");
        }
    }

    public interface BackendService {
        String getName();
        <T> T execute(Function<?, T> operation) throws Exception;
    }
}

Pattern 3: Parallel Batch Processing

Problem: Process items in parallel batches, maintain ordering, handle partial failures.

public class BatchProcessor<T, R> {
    private final int batchSize;
    private final Function<List<T>, List<R>> processor;
    private final int maxRetries;

    public BatchProcessor(int batchSize, 
                         Function<List<T>, List<R>> processor,
                         int maxRetries) {
        this.batchSize = batchSize;
        this.processor = processor;
        this.maxRetries = maxRetries;
    }

    public List<R> processAll(List<T> items) throws Exception {
        List<R> results = Collections.synchronizedList(new ArrayList<>());
        List<Integer> failedBatches = Collections.synchronizedList(new ArrayList<>());

        // Create batches
        List<List<T>> batches = createBatches(items);

        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            int batchIndex = 0;
            for (List<T> batch : batches) {
                int currentBatch = batchIndex++;

                scope.fork(() -> {
                    System.out.println("Processing batch " + currentBatch);

                    List<R> batchResults;
                    try {
                        batchResults = retryBatch(batch, maxRetries);
                    } catch (Exception e) {
                        System.err.println("Batch " + currentBatch + " failed after retries");
                        failedBatches.add(currentBatch);
                        throw e;
                    }

                    // Maintain order
                    synchronized (results) {
                        results.addAll(currentBatch * batchSize, batchResults);
                    }
                    return null;
                });
            }

            scope.join();

            if (!failedBatches.isEmpty()) {
                throw new BatchProcessingException(
                    "Failed batches: " + failedBatches
                );
            }

        } catch (Exception e) {
            System.err.println("Batch processing failed: " + e.getMessage());
            throw e;
        }

        return results;
    }

    private List<R> retryBatch(List<T> batch, int maxRetries) 
            throws Exception {
        Exception lastException = null;

        for (int attempt = 0; attempt <= maxRetries; attempt++) {
            try {
                return processor.apply(batch);
            } catch (Exception e) {
                lastException = e;
                System.out.println("Batch attempt " + (attempt + 1) + 
                    " failed, retrying...");

                if (attempt < maxRetries) {
                    Thread.sleep(100 * (attempt + 1));  // Exponential backoff
                }
            }
        }

        throw lastException;
    }

    private List<List<T>> createBatches(List<T> items) {
        List<List<T>> batches = new ArrayList<>();
        for (int i = 0; i < items.size(); i += batchSize) {
            int end = Math.min(i + batchSize, items.size());
            batches.add(items.subList(i, end));
        }
        return batches;
    }

    public static class BatchProcessingException extends Exception {
        public BatchProcessingException(String message) {
            super(message);
        }
    }
}

// Usage example
public class BatchProcessingExample {
    public static void main(String[] args) throws Exception {
        List<Integer> items = IntStream.range(0, 1000)
            .boxed()
            .collect(Collectors.toList());

        BatchProcessor<Integer, String> processor = new BatchProcessor<>(
            100,  // batch size
            batch -> batch.stream()
                .map(i -> "Item-" + i)
                .collect(Collectors.toList()),
            2     // max retries
        );

        List<String> results = processor.processAll(items);
        System.out.println("Processed " + results.size() + " items");
    }
}

Pattern 4: Circuit Breaker with Structured Concurrency

Problem: Prevent cascading failures by wrapping requests with timeout and fallback.

public class CircuitBreaker<T> {
    private final Supplier<T> operation;
    private final Supplier<T> fallback;
    private final Duration timeout;
    private final int failureThreshold;

    private volatile int failureCount = 0;
    private volatile State state = State.CLOSED;

    enum State { CLOSED, OPEN, HALF_OPEN }

    public CircuitBreaker(
        Supplier<T> operation,
        Supplier<T> fallback,
        Duration timeout,
        int failureThreshold) {

        this.operation = operation;
        this.fallback = fallback;
        this.timeout = timeout;
        this.failureThreshold = failureThreshold;
    }

    public T execute() throws Exception {
        if (state == State.OPEN) {
            System.out.println("Circuit OPEN - using fallback");
            return fallback.get();
        }

        try {
            T result = executeWithTimeout(operation, timeout);

            // Success - reset failure count
            if (state == State.HALF_OPEN) {
                System.out.println("Circuit CLOSED - operation recovered");
                state = State.CLOSED;
            }
            failureCount = 0;
            return result;

        } catch (Exception e) {
            failureCount++;
            System.err.println("Operation failed (" + failureCount + 
                "/" + failureThreshold + ")");

            if (failureCount >= failureThreshold) {
                state = State.OPEN;
                System.err.println("Circuit OPEN - too many failures");
                scheduleHalfOpenAfterDelay();
            }

            // Use fallback
            System.out.println("Using fallback due to error: " + e.getMessage());
            return fallback.get();
        }
    }

    private <R> R executeWithTimeout(Supplier<R> supplier, Duration timeout) 
            throws Exception {

        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task = scope.fork(supplier::get);

            Thread watchdog = new Thread(() -> {
                try {
                    Thread.sleep(timeout.toMillis());
                    scope.shutdown();
                } catch (InterruptedException e) {
                    // Completed
                }
            });
            watchdog.setDaemon(true);
            watchdog.start();

            scope.join();

            if (task.hasException()) {
                throw task.exception();
            }
            return task.resultNow();
        }
    }

    private void scheduleHalfOpenAfterDelay() {
        Thread scheduler = new Thread(() -> {
            try {
                Thread.sleep(5000);  // Wait 5 seconds before trying again
                if (state == State.OPEN) {
                    state = State.HALF_OPEN;
                    System.out.println("Circuit HALF_OPEN - allowing test request");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        scheduler.setDaemon(true);
        scheduler.start();
    }
}

// Usage
public class CircuitBreakerExample {
    public static void main(String[] args) throws Exception {
        CircuitBreaker<String> breaker = new CircuitBreaker<>(
            () -> {
                // This might fail
                return unstableRemoteService.fetchData();
            },
            () -> {
                // Fallback
                return "Cached data";
            },
            Duration.ofSeconds(2),  // 2 second timeout
            3  // Open circuit after 3 failures
        );

        // Multiple requests
        for (int i = 0; i < 10; i++) {
            try {
                String data = breaker.execute();
                System.out.println("Got: " + data);
            } catch (Exception e) {
                System.err.println("Request failed: " + e.getMessage());
            }

            Thread.sleep(500);
        }
    }
}

Pattern 5: Hierarchical Task Composition

Problem: Build complex operations from simpler ones, maintain proper task hierarchy.

public class OrderFulfillmentService {
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final ShippingService shippingService;
    private final NotificationService notificationService;

    public record Order(String id, List<Item> items, String shippingAddress) {}
    public record Item(String productId, int quantity) {}

    public void fulfillOrder(Order order) throws Exception {
        System.out.println("=== Fulfilling order " + order.id() + " ===");

        // Level 1: Main orchestration
        try (var mainScope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Process payment
            var paymentTask = mainScope.fork(() -> {
                System.out.println("Level 2: Processing payment");
                return processPayment(order);
            });

            // Allocate inventory
            var inventoryTask = mainScope.fork(() -> {
                System.out.println("Level 2: Allocating inventory");
                return allocateInventory(order);
            });

            mainScope.join();
            mainScope.throwIfFailed();

            String paymentId = paymentTask.resultNow();
            String reservationId = inventoryTask.resultNow();

            System.out.println("Level 2: Payment and inventory complete");

            // Now arrange shipping (depends on successful payment + inventory)
            String shipmentId = arrangeShipping(order, reservationId);

            // Send confirmations (best-effort)
            sendConfirmations(order, paymentId, reservationId, shipmentId);

            System.out.println("Order " + order.id() + " fulfilled successfully");

        } catch (Exception e) {
            System.err.println("Order fulfillment failed: " + e.getMessage());
            // Cleanup on failure
            throw e;
        }
    }

    private String processPayment(Order order) throws Exception {
        // Level 3: Payment sub-operations
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var authTask = scope.fork(() -> {
                System.out.println("  Level 3: Authorizing payment");
                return authorizePayment(order);
            });

            var fraudCheckTask = scope.fork(() -> {
                System.out.println("  Level 3: Checking fraud");
                return checkFraud(order);
            });

            scope.join();
            scope.throwIfFailed();

            String authId = authTask.resultNow();
            boolean isFraudulent = fraudCheckTask.resultNow();

            if (isFraudulent) {
                throw new IllegalStateException("Fraud detected");
            }

            return authId;
        }
    }

    private String allocateInventory(Order order) throws Exception {
        // Level 3: Inventory sub-operations
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            List<var> reservationTasks = new ArrayList<>();

            for (Item item : order.items()) {
                var task = scope.fork(() -> {
                    System.out.println("  Level 3: Reserving " + 
                        item.quantity() + " of " + item.productId());
                    return reserveItem(item);
                });
                reservationTasks.add(task);
            }

            scope.join();
            scope.throwIfFailed();

            return "RESERVATION-" + UUID.randomUUID();
        }
    }

    private String arrangeShipping(Order order, String reservationId) throws Exception {
        System.out.println("Level 2: Arranging shipping");

        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var pickTask = scope.fork(() -> {
                System.out.println("  Level 3: Picking items");
                return pickItems(reservationId);
            });

            var labelTask = scope.fork(() -> {
                System.out.println("  Level 3: Creating shipping label");
                return createShippingLabel(order.shippingAddress());
            });

            scope.join();
            scope.throwIfFailed();

            String pickId = pickTask.resultNow();
            String labelId = labelTask.resultNow();

            System.out.println("  Level 3: Scheduling pickup");
            return schedulePickup(pickId, labelId);
        }
    }

    private void sendConfirmations(Order order, String paymentId, 
                                  String reservationId, String shipmentId) {
        // Non-critical - don't fail order if notifications fail
        try (var scope = new StructuredTaskScope<Void>()) {
            scope.fork(() -> {
                notificationService.sendPaymentConfirmation(order, paymentId);
                return null;
            });

            scope.fork(() -> {
                notificationService.sendReservationConfirmation(order, reservationId);
                return null;
            });

            scope.fork(() -> {
                notificationService.sendShipmentConfirmation(order, shipmentId);
                return null;
            });

            scope.join();

        } catch (Exception e) {
            System.err.println("Notification error (non-critical): " + e.getMessage());
        }
    }

    // Helper methods
    private String authorizePayment(Order order) { return "AUTH-" + UUID.randomUUID(); }
    private boolean checkFraud(Order order) { return false; }
    private String reserveItem(Item item) { return "RES-" + item.productId(); }
    private String pickItems(String reservationId) { return "PICK-" + UUID.randomUUID(); }
    private String createShippingLabel(String address) { return "LABEL-" + UUID.randomUUID(); }
    private String schedulePickup(String pickId, String labelId) { return "SHIP-" + UUID.randomUUID(); }

    // Service interfaces
    interface PaymentService {}
    interface InventoryService {}
    interface ShippingService {}
    interface NotificationService {
        void sendPaymentConfirmation(Order order, String paymentId) throws Exception;
        void sendReservationConfirmation(Order order, String reservationId) throws Exception;
        void sendShipmentConfirmation(Order order, String shipmentId) throws Exception;
    }
}

Best Practices Summary

1. Match Scope Policy to Requirements

  • ShutdownOnFailure: All-or-nothing operations
  • ShutdownOnSuccess: Racing for first result
  • Custom: Complex scenarios requiring partial success

2. Always Use Try-With-Resources

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Guaranteed cleanup
}

3. Handle Errors Appropriately

scope.join();
try {
    scope.throwIfFailed();
} catch (Exception e) {
    // Handle aggregated errors
}

4. Design for Composability

// Nested scopes maintain proper hierarchy
try (var outer = new StructuredTaskScope.ShutdownOnFailure()) {
    outer.fork(() -> {
        try (var inner = new StructuredTaskScope.ShutdownOnFailure()) {
            // Structured hierarchy preserved
        }
    });
}

5. Monitor and Log

scope.fork(() -> {
    try {
        T result = operation.get();
        logger.info("Operation succeeded");
        return result;
    } catch (Exception e) {
        logger.error("Operation failed", e);
        throw e;
    }
});