9.2 Task Scope Policies and Error Handling
Different task scope policies provide different concurrency semantics. Understanding when to use each is crucial for building robust applications.
ShutdownOnFailure Policy
Default behavior - cancels all on first failure:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
System.out.println("Task 1 starting");
Thread.sleep(100);
throw new RuntimeException("Task 1 failed");
});
var task2 = scope.fork(() -> {
System.out.println("Task 2 starting");
Thread.sleep(5000); // Long operation
return "Task 2 result";
});
scope.join();
System.out.println("Join complete");
scope.throwIfFailed();
} catch (Exception e) {
// Task 1's exception
System.err.println("Caught: " + e.getMessage());
// Task 2 cancelled and cancelled
}
// Output:
// Task 1 starting
// Task 2 starting
// Join complete
// Caught: Task 1 failed
// (Task 2 interrupted and cancelled)
When to Use:
// Perfect for "all or nothing" scenarios
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var validateUser = scope.fork(() -> validateUserData(user));
var checkInventory = scope.fork(() -> checkStock(items));
var verifyPayment = scope.fork(() -> verifyPaymentMethod(payment));
scope.join();
scope.throwIfFailed();
// All passed - proceed with order
processOrder(user, items, payment);
}
Error Propagation:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
throw new IllegalArgumentException("Invalid input");
});
var task2 = scope.fork(() -> {
throw new IOException("Network error");
});
scope.join();
try {
scope.throwIfFailed();
} catch (Exception e) {
// Will throw one of the two exceptions (first encountered)
System.err.println("Error: " + e.getClass().getSimpleName() +
" - " + e.getMessage());
}
}
ShutdownOnSuccess Policy
Race condition - cancels all when one succeeds:
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// Fork multiple tasks competing for result
var task1 = scope.fork(() -> {
System.out.println("Fetching from server 1...");
Thread.sleep(1000);
return "Result from server 1";
});
var task2 = scope.fork(() -> {
System.out.println("Fetching from server 2...");
Thread.sleep(500);
return "Result from server 2";
});
var task3 = scope.fork(() -> {
System.out.println("Fetching from server 3...");
Thread.sleep(2000);
return "Result from server 3";
});
scope.join();
// Get first successful result
String result = scope.result();
System.out.println("Got: " + result); // Server 2's result (fastest)
}
// Output:
// Fetching from server 1...
// Fetching from server 2...
// Fetching from server 3...
// Got: Result from server 2
// (Other tasks cancelled)
When to Use:
// Perfect for finding quickest successful result
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<CacheEntry>()) {
// Try multiple cache servers
var localCache = scope.fork(() -> queryLocalCache(key));
var remoteCache1 = scope.fork(() -> queryRemoteCache1(key));
var remoteCache2 = scope.fork(() -> queryRemoteCache2(key));
scope.join();
CacheEntry entry = scope.result(); // First to respond
return entry;
}
Handling Failures and Successes:
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
var primaryDns = scope.fork(() -> resolveDns("primary.ns"));
var secondaryDns = scope.fork(() -> resolveDns("secondary.ns"));
var tertiaryDns = scope.fork(() -> resolveDns("tertiary.ns"));
try {
scope.join();
String result = scope.result(); // First successful
System.out.println("DNS resolved: " + result);
} catch (Exception e) {
// All failed
System.err.println("All DNS servers failed");
throw e;
}
}
Custom Task Scope Policies
Custom Scope Implementation:
import java.util.concurrent.StructuredTaskScope;
import java.util.*;
public class CustomTaskScope<T> extends StructuredTaskScope<T> {
private final List<Exception> allExceptions = Collections.synchronizedList(new ArrayList<>());
private final int maxFailures;
public CustomTaskScope(int maxFailures) {
this.maxFailures = maxFailures;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
// Called when each subtask completes
if (subtask.hasException()) {
allExceptions.add(subtask.exception());
System.out.println("Task failed: " + subtask.exception().getMessage());
// Shutdown after N failures
if (allExceptions.size() >= maxFailures) {
System.out.println("Max failures reached, shutting down");
this.shutdown();
}
} else {
System.out.println("Task succeeded: " + subtask.resultNow());
}
}
public List<Exception> getAllExceptions() {
return new ArrayList<>(allExceptions);
}
}
// Usage
try (var scope = new CustomTaskScope<String>(3)) {
// Will continue until 3 failures
var task1 = scope.fork(() -> operation1());
var task2 = scope.fork(() -> operation2());
var task3 = scope.fork(() -> operation3());
var task4 = scope.fork(() -> operation4());
scope.join();
List<Exception> errors = scope.getAllExceptions();
if (!errors.isEmpty()) {
System.out.println("Encountered " + errors.size() + " errors");
}
}
Partial Success Scope:
public class PartialSuccessScope<T> extends StructuredTaskScope<T> {
private final List<T> successResults = Collections.synchronizedList(new ArrayList<>());
private final List<Exception> failures = Collections.synchronizedList(new ArrayList<>());
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.hasException()) {
failures.add(subtask.exception());
} else {
successResults.add(subtask.resultNow());
}
}
public List<T> getSuccesses() {
return new ArrayList<>(successResults);
}
public List<Exception> getFailures() {
return new ArrayList<>(failures);
}
}
// Usage
try (var scope = new PartialSuccessScope<String>()) {
scope.fork(() -> workThatMayFail1());
scope.fork(() -> workThatMayFail2());
scope.fork(() -> workThatMayFail3());
scope.fork(() -> workThatMayFail4());
scope.join();
List<String> successes = scope.getSuccesses();
List<Exception> failures = scope.getFailures();
System.out.println("Succeeded: " + successes.size());
System.out.println("Failed: " + failures.size());
}
Error Recovery Strategies
Retry Failed Tasks:
public <T> T executeWithRetry(Supplier<T> operation, int maxRetries) throws Exception {
Exception lastException = null;
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation::get);
scope.join();
scope.throwIfFailed();
return task.resultNow();
} catch (Exception e) {
lastException = e;
System.out.println("Attempt " + (attempt + 1) + " failed: " + e.getMessage());
if (attempt < maxRetries) {
Thread.sleep(100 * (attempt + 1)); // Exponential backoff
}
}
}
throw lastException;
}
// Usage
String result = executeWithRetry(
() -> unstableService.fetchData(),
3 // Retry up to 3 times
);
Fallback on Failure:
public <T> T executeWithFallback(
Supplier<T> primary,
Supplier<T> fallback) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var primaryTask = scope.fork(primary::get);
scope.join();
scope.throwIfFailed();
return primaryTask.resultNow();
} catch (Exception e) {
System.out.println("Primary failed, using fallback: " + e.getMessage());
try (var fallbackScope = new StructuredTaskScope.ShutdownOnFailure()) {
var fallbackTask = fallbackScope.fork(fallback::get);
fallbackScope.join();
fallbackScope.throwIfFailed();
return fallbackTask.resultNow();
}
}
}
// Usage
Database database = executeWithFallback(
() -> primaryDatabase.connect(),
() -> replicaDatabase.connect()
);
Cascade Failures:
public record OperationResult<T>(T value, List<Exception> errors, boolean success) {}
public <T> OperationResult<T> executeWithCascade(
Supplier<T> operation1,
Supplier<T> operation2,
Supplier<T> operation3) {
List<Exception> allErrors = new ArrayList<>();
// Try operation 1
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation1::get);
scope.join();
scope.throwIfFailed();
return new OperationResult<>(task.resultNow(), allErrors, true);
} catch (Exception e) {
allErrors.add(e);
System.out.println("Operation 1 failed, trying 2");
}
// Try operation 2
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation2::get);
scope.join();
scope.throwIfFailed();
return new OperationResult<>(task.resultNow(), allErrors, true);
} catch (Exception e) {
allErrors.add(e);
System.out.println("Operation 2 failed, trying 3");
}
// Try operation 3
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation3::get);
scope.join();
scope.throwIfFailed();
return new OperationResult<>(task.resultNow(), allErrors, true);
} catch (Exception e) {
allErrors.add(e);
return new OperationResult<>(null, allErrors, false);
}
}
Cancellation and Timeouts
Automatic Cancellation:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
System.out.println("Task 1 starting");
Thread.sleep(1000);
return "Task 1 done";
});
var task2 = scope.fork(() -> {
System.out.println("Task 2 starting");
try {
Thread.sleep(5000);
return "Task 2 done";
} catch (InterruptedException e) {
System.out.println("Task 2 cancelled!");
throw e;
}
});
scope.join();
// task2 completes with InterruptedException
}
Timeout Pattern:
public <T> T executeWithTimeout(Supplier<T> operation, Duration timeout) throws Exception {
long startTime = System.currentTimeMillis();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation::get);
// Monitor for timeout in separate thread
Thread timeoutMonitor = new Thread(() -> {
try {
long elapsed = System.currentTimeMillis() - startTime;
long remaining = timeout.toMillis() - elapsed;
if (remaining > 0) {
Thread.sleep(remaining);
System.out.println("Timeout! Shutting down scope");
scope.shutdown();
}
} catch (InterruptedException e) {
// Scope completed before timeout
}
});
timeoutMonitor.setDaemon(true);
timeoutMonitor.start();
scope.join();
scope.throwIfFailed();
return task.resultNow();
} finally {
// Timeout monitor will be interrupted when scope exits
}
}
Real-World Example: Service Orchestration
import java.util.*;
import java.util.concurrent.*;
public class OrderProcessingOrchestrator {
private final UserService userService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final NotificationService notificationService;
private final ShippingService shippingService;
public OrderResult processOrder(OrderRequest request) throws Exception {
// All services called in parallel with structured concurrency
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Parallel validation and data loading
var userTask = scope.fork(() ->
userService.validateAndLoad(request.userId()));
var inventoryTask = scope.fork(() ->
inventoryService.checkAndReserve(request.items()));
var paymentTask = scope.fork(() ->
paymentService.authorize(request.paymentInfo()));
// Wait for all critical operations
scope.join();
scope.throwIfFailed();
// All succeeded
User user = userTask.resultNow();
InventoryReservation inventory = inventoryTask.resultNow();
PaymentAuthorization payment = paymentTask.resultNow();
// Process order
Order order = createOrder(user, inventory, payment);
// Send notifications (best-effort, don't fail on error)
sendNotifications(order);
// Schedule shipping
shippingService.scheduleShipment(order);
return new OrderResult(true, order.id(), null);
} catch (Exception e) {
// Any critical operation failed
System.err.println("Order processing failed: " + e.getMessage());
return new OrderResult(false, null, e.getMessage());
}
}
private void sendNotifications(Order order) {
// Non-critical - don't fail the entire operation
try (var scope = new StructuredTaskScope<Void>()) {
scope.fork(() -> {
notificationService.sendOrderConfirmation(order.userId(), order);
return null;
});
scope.fork(() -> {
notificationService.sendInventoryUpdate(order.items());
return null;
});
scope.join();
// Ignore failures in notifications
} catch (Exception e) {
System.err.println("Notification error: " + e.getMessage());
}
}
private Order createOrder(User user, InventoryReservation inventory,
PaymentAuthorization payment) {
return new Order(
UUID.randomUUID().toString(),
user.id(),
inventory.items(),
payment.transactionId(),
Order.Status.CONFIRMED
);
}
// Data classes
record OrderRequest(String userId, List<OrderItem> items, PaymentInfo paymentInfo) {}
record OrderItem(String productId, int quantity) {}
record PaymentInfo(String method, String amount) {}
record User(String id, String name, String email) {}
record InventoryReservation(String reservationId, List<OrderItem> items) {}
record PaymentAuthorization(String transactionId, String status) {}
record Order(String id, String userId, List<OrderItem> items,
String paymentId, Status status) {
enum Status { PENDING, CONFIRMED, SHIPPED, DELIVERED }
}
record OrderResult(boolean success, String orderId, String error) {}
// Service interfaces
interface UserService {
User validateAndLoad(String userId) throws Exception;
}
interface InventoryService {
InventoryReservation checkAndReserve(List<OrderItem> items) throws Exception;
}
interface PaymentService {
PaymentAuthorization authorize(PaymentInfo info) throws Exception;
}
interface NotificationService {
void sendOrderConfirmation(String userId, Order order) throws Exception;
void sendInventoryUpdate(List<OrderItem> items) throws Exception;
}
interface ShippingService {
void scheduleShipment(Order order) throws Exception;
}
}
Best Practices
Choose Right Policy
// All or nothing try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { } // First success (race) try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) { }Always Call join() Before Results
scope.join(); scope.throwIfFailed(); T result = task.resultNow();Handle Exceptions Properly
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // tasks } catch (Exception e) { // Handle aggregated errors }Don't Suppress Errors
// Bad try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // ... } catch (Exception e) { // Ignored! } // Good try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // ... } catch (Exception e) { logger.error("Operation failed", e); throw e; }Use for Coordination, Not Long-Term
// Good - scope for single operation try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // Fetch related data } // Avoid - long-lived scope StructuredTaskScope<?> scope = ...; // ... some application runs ...