9.2 Task Scope Policies and Error Handling

Different task scope policies provide different concurrency semantics. Understanding when to use each is crucial for building robust applications.

ShutdownOnFailure Policy

Default behavior - cancels all on first failure:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        System.out.println("Task 1 starting");
        Thread.sleep(100);
        throw new RuntimeException("Task 1 failed");
    });

    var task2 = scope.fork(() -> {
        System.out.println("Task 2 starting");
        Thread.sleep(5000);  // Long operation
        return "Task 2 result";
    });

    scope.join();
    System.out.println("Join complete");
    scope.throwIfFailed();

} catch (Exception e) {
    // Task 1's exception
    System.err.println("Caught: " + e.getMessage());
    // Task 2 cancelled and cancelled
}

// Output:
// Task 1 starting
// Task 2 starting
// Join complete
// Caught: Task 1 failed
// (Task 2 interrupted and cancelled)

When to Use:

// Perfect for "all or nothing" scenarios
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var validateUser = scope.fork(() -> validateUserData(user));
    var checkInventory = scope.fork(() -> checkStock(items));
    var verifyPayment = scope.fork(() -> verifyPaymentMethod(payment));

    scope.join();
    scope.throwIfFailed();

    // All passed - proceed with order
    processOrder(user, items, payment);
}

Error Propagation:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        throw new IllegalArgumentException("Invalid input");
    });
    var task2 = scope.fork(() -> {
        throw new IOException("Network error");
    });

    scope.join();

    try {
        scope.throwIfFailed();
    } catch (Exception e) {
        // Will throw one of the two exceptions (first encountered)
        System.err.println("Error: " + e.getClass().getSimpleName() + 
            " - " + e.getMessage());
    }
}

ShutdownOnSuccess Policy

Race condition - cancels all when one succeeds:

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    // Fork multiple tasks competing for result
    var task1 = scope.fork(() -> {
        System.out.println("Fetching from server 1...");
        Thread.sleep(1000);
        return "Result from server 1";
    });

    var task2 = scope.fork(() -> {
        System.out.println("Fetching from server 2...");
        Thread.sleep(500);
        return "Result from server 2";
    });

    var task3 = scope.fork(() -> {
        System.out.println("Fetching from server 3...");
        Thread.sleep(2000);
        return "Result from server 3";
    });

    scope.join();

    // Get first successful result
    String result = scope.result();
    System.out.println("Got: " + result);  // Server 2's result (fastest)
}

// Output:
// Fetching from server 1...
// Fetching from server 2...
// Fetching from server 3...
// Got: Result from server 2
// (Other tasks cancelled)

When to Use:

// Perfect for finding quickest successful result
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<CacheEntry>()) {
    // Try multiple cache servers
    var localCache = scope.fork(() -> queryLocalCache(key));
    var remoteCache1 = scope.fork(() -> queryRemoteCache1(key));
    var remoteCache2 = scope.fork(() -> queryRemoteCache2(key));

    scope.join();

    CacheEntry entry = scope.result();  // First to respond
    return entry;
}

Handling Failures and Successes:

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    var primaryDns = scope.fork(() -> resolveDns("primary.ns"));
    var secondaryDns = scope.fork(() -> resolveDns("secondary.ns"));
    var tertiaryDns = scope.fork(() -> resolveDns("tertiary.ns"));

    try {
        scope.join();
        String result = scope.result();  // First successful
        System.out.println("DNS resolved: " + result);
    } catch (Exception e) {
        // All failed
        System.err.println("All DNS servers failed");
        throw e;
    }
}

Custom Task Scope Policies

Custom Scope Implementation:

import java.util.concurrent.StructuredTaskScope;
import java.util.*;

public class CustomTaskScope<T> extends StructuredTaskScope<T> {
    private final List<Exception> allExceptions = Collections.synchronizedList(new ArrayList<>());
    private final int maxFailures;

    public CustomTaskScope(int maxFailures) {
        this.maxFailures = maxFailures;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        // Called when each subtask completes
        if (subtask.hasException()) {
            allExceptions.add(subtask.exception());
            System.out.println("Task failed: " + subtask.exception().getMessage());

            // Shutdown after N failures
            if (allExceptions.size() >= maxFailures) {
                System.out.println("Max failures reached, shutting down");
                this.shutdown();
            }
        } else {
            System.out.println("Task succeeded: " + subtask.resultNow());
        }
    }

    public List<Exception> getAllExceptions() {
        return new ArrayList<>(allExceptions);
    }
}

// Usage
try (var scope = new CustomTaskScope<String>(3)) {
    // Will continue until 3 failures
    var task1 = scope.fork(() -> operation1());
    var task2 = scope.fork(() -> operation2());
    var task3 = scope.fork(() -> operation3());
    var task4 = scope.fork(() -> operation4());

    scope.join();

    List<Exception> errors = scope.getAllExceptions();
    if (!errors.isEmpty()) {
        System.out.println("Encountered " + errors.size() + " errors");
    }
}

Partial Success Scope:

public class PartialSuccessScope<T> extends StructuredTaskScope<T> {
    private final List<T> successResults = Collections.synchronizedList(new ArrayList<>());
    private final List<Exception> failures = Collections.synchronizedList(new ArrayList<>());

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.hasException()) {
            failures.add(subtask.exception());
        } else {
            successResults.add(subtask.resultNow());
        }
    }

    public List<T> getSuccesses() {
        return new ArrayList<>(successResults);
    }

    public List<Exception> getFailures() {
        return new ArrayList<>(failures);
    }
}

// Usage
try (var scope = new PartialSuccessScope<String>()) {
    scope.fork(() -> workThatMayFail1());
    scope.fork(() -> workThatMayFail2());
    scope.fork(() -> workThatMayFail3());
    scope.fork(() -> workThatMayFail4());

    scope.join();

    List<String> successes = scope.getSuccesses();
    List<Exception> failures = scope.getFailures();

    System.out.println("Succeeded: " + successes.size());
    System.out.println("Failed: " + failures.size());
}

Error Recovery Strategies

Retry Failed Tasks:

public <T> T executeWithRetry(Supplier<T> operation, int maxRetries) throws Exception {
    Exception lastException = null;

    for (int attempt = 0; attempt <= maxRetries; attempt++) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task = scope.fork(operation::get);
            scope.join();
            scope.throwIfFailed();
            return task.resultNow();
        } catch (Exception e) {
            lastException = e;
            System.out.println("Attempt " + (attempt + 1) + " failed: " + e.getMessage());

            if (attempt < maxRetries) {
                Thread.sleep(100 * (attempt + 1));  // Exponential backoff
            }
        }
    }

    throw lastException;
}

// Usage
String result = executeWithRetry(
    () -> unstableService.fetchData(),
    3  // Retry up to 3 times
);

Fallback on Failure:

public <T> T executeWithFallback(
        Supplier<T> primary,
        Supplier<T> fallback) throws Exception {

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var primaryTask = scope.fork(primary::get);

        scope.join();
        scope.throwIfFailed();
        return primaryTask.resultNow();

    } catch (Exception e) {
        System.out.println("Primary failed, using fallback: " + e.getMessage());

        try (var fallbackScope = new StructuredTaskScope.ShutdownOnFailure()) {
            var fallbackTask = fallbackScope.fork(fallback::get);
            fallbackScope.join();
            fallbackScope.throwIfFailed();
            return fallbackTask.resultNow();
        }
    }
}

// Usage
Database database = executeWithFallback(
    () -> primaryDatabase.connect(),
    () -> replicaDatabase.connect()
);

Cascade Failures:

public record OperationResult<T>(T value, List<Exception> errors, boolean success) {}

public <T> OperationResult<T> executeWithCascade(
        Supplier<T> operation1,
        Supplier<T> operation2,
        Supplier<T> operation3) {

    List<Exception> allErrors = new ArrayList<>();

    // Try operation 1
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var task = scope.fork(operation1::get);
        scope.join();
        scope.throwIfFailed();
        return new OperationResult<>(task.resultNow(), allErrors, true);
    } catch (Exception e) {
        allErrors.add(e);
        System.out.println("Operation 1 failed, trying 2");
    }

    // Try operation 2
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var task = scope.fork(operation2::get);
        scope.join();
        scope.throwIfFailed();
        return new OperationResult<>(task.resultNow(), allErrors, true);
    } catch (Exception e) {
        allErrors.add(e);
        System.out.println("Operation 2 failed, trying 3");
    }

    // Try operation 3
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var task = scope.fork(operation3::get);
        scope.join();
        scope.throwIfFailed();
        return new OperationResult<>(task.resultNow(), allErrors, true);
    } catch (Exception e) {
        allErrors.add(e);
        return new OperationResult<>(null, allErrors, false);
    }
}

Cancellation and Timeouts

Automatic Cancellation:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        System.out.println("Task 1 starting");
        Thread.sleep(1000);
        return "Task 1 done";
    });

    var task2 = scope.fork(() -> {
        System.out.println("Task 2 starting");
        try {
            Thread.sleep(5000);
            return "Task 2 done";
        } catch (InterruptedException e) {
            System.out.println("Task 2 cancelled!");
            throw e;
        }
    });

    scope.join();
    // task2 completes with InterruptedException
}

Timeout Pattern:

public <T> T executeWithTimeout(Supplier<T> operation, Duration timeout) throws Exception {
    long startTime = System.currentTimeMillis();

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var task = scope.fork(operation::get);

        // Monitor for timeout in separate thread
        Thread timeoutMonitor = new Thread(() -> {
            try {
                long elapsed = System.currentTimeMillis() - startTime;
                long remaining = timeout.toMillis() - elapsed;

                if (remaining > 0) {
                    Thread.sleep(remaining);
                    System.out.println("Timeout! Shutting down scope");
                    scope.shutdown();
                }
            } catch (InterruptedException e) {
                // Scope completed before timeout
            }
        });
        timeoutMonitor.setDaemon(true);
        timeoutMonitor.start();

        scope.join();
        scope.throwIfFailed();
        return task.resultNow();

    } finally {
        // Timeout monitor will be interrupted when scope exits
    }
}

Real-World Example: Service Orchestration

import java.util.*;
import java.util.concurrent.*;

public class OrderProcessingOrchestrator {
    private final UserService userService;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final NotificationService notificationService;
    private final ShippingService shippingService;

    public OrderResult processOrder(OrderRequest request) throws Exception {
        // All services called in parallel with structured concurrency
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Parallel validation and data loading
            var userTask = scope.fork(() -> 
                userService.validateAndLoad(request.userId()));

            var inventoryTask = scope.fork(() -> 
                inventoryService.checkAndReserve(request.items()));

            var paymentTask = scope.fork(() -> 
                paymentService.authorize(request.paymentInfo()));

            // Wait for all critical operations
            scope.join();
            scope.throwIfFailed();

            // All succeeded
            User user = userTask.resultNow();
            InventoryReservation inventory = inventoryTask.resultNow();
            PaymentAuthorization payment = paymentTask.resultNow();

            // Process order
            Order order = createOrder(user, inventory, payment);

            // Send notifications (best-effort, don't fail on error)
            sendNotifications(order);

            // Schedule shipping
            shippingService.scheduleShipment(order);

            return new OrderResult(true, order.id(), null);

        } catch (Exception e) {
            // Any critical operation failed
            System.err.println("Order processing failed: " + e.getMessage());
            return new OrderResult(false, null, e.getMessage());
        }
    }

    private void sendNotifications(Order order) {
        // Non-critical - don't fail the entire operation
        try (var scope = new StructuredTaskScope<Void>()) {
            scope.fork(() -> {
                notificationService.sendOrderConfirmation(order.userId(), order);
                return null;
            });

            scope.fork(() -> {
                notificationService.sendInventoryUpdate(order.items());
                return null;
            });

            scope.join();

            // Ignore failures in notifications
        } catch (Exception e) {
            System.err.println("Notification error: " + e.getMessage());
        }
    }

    private Order createOrder(User user, InventoryReservation inventory, 
                             PaymentAuthorization payment) {
        return new Order(
            UUID.randomUUID().toString(),
            user.id(),
            inventory.items(),
            payment.transactionId(),
            Order.Status.CONFIRMED
        );
    }

    // Data classes
    record OrderRequest(String userId, List<OrderItem> items, PaymentInfo paymentInfo) {}
    record OrderItem(String productId, int quantity) {}
    record PaymentInfo(String method, String amount) {}
    record User(String id, String name, String email) {}
    record InventoryReservation(String reservationId, List<OrderItem> items) {}
    record PaymentAuthorization(String transactionId, String status) {}
    record Order(String id, String userId, List<OrderItem> items, 
                 String paymentId, Status status) {
        enum Status { PENDING, CONFIRMED, SHIPPED, DELIVERED }
    }
    record OrderResult(boolean success, String orderId, String error) {}

    // Service interfaces
    interface UserService {
        User validateAndLoad(String userId) throws Exception;
    }
    interface InventoryService {
        InventoryReservation checkAndReserve(List<OrderItem> items) throws Exception;
    }
    interface PaymentService {
        PaymentAuthorization authorize(PaymentInfo info) throws Exception;
    }
    interface NotificationService {
        void sendOrderConfirmation(String userId, Order order) throws Exception;
        void sendInventoryUpdate(List<OrderItem> items) throws Exception;
    }
    interface ShippingService {
        void scheduleShipment(Order order) throws Exception;
    }
}

Best Practices

  1. Choose Right Policy

    // All or nothing
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { }
    
    // First success (race)
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) { }
    
  2. Always Call join() Before Results

    scope.join();
    scope.throwIfFailed();
    T result = task.resultNow();
    
  3. Handle Exceptions Properly

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // tasks
    } catch (Exception e) {
        // Handle aggregated errors
    }
    
  4. Don't Suppress Errors

    // Bad
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // ...
    } catch (Exception e) {
        // Ignored!
    }
    
    // Good
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // ...
    } catch (Exception e) {
        logger.error("Operation failed", e);
        throw e;
    }
    
  5. Use for Coordination, Not Long-Term

    // Good - scope for single operation
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // Fetch related data
    }
    
    // Avoid - long-lived scope
    StructuredTaskScope<?> scope = ...;
    // ... some application runs ...