9.1 Structured Concurrency Fundamentals
Structured concurrency treats a group of related tasks as a single unit of work, establishing clear parent-child relationships, error propagation, and cancellation semantics. This is a fundamental shift from unstructured concurrency.
What is Structured Concurrency?
Traditional Unstructured Concurrency:
// Unstructured - no clear lifetime
ExecutorService executor = Executors.newFixedThreadPool(10);
// Submit tasks but no guarantee of completion
Future<User> userFuture = executor.submit(() -> fetchUser(id));
Future<List<Order>> ordersFuture = executor.submit(() -> fetchOrders(id));
// Must manually coordinate completion
User user = userFuture.get(); // May timeout, may throw
List<Order> orders = ordersFuture.get(); // Independent error handling
// May forget to shut down
// executor.shutdown(); // Easy to forget!
Structured Concurrency:
// Structured - clear lifetime and error handling
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Create subtasks within scope
var userTask = scope.fork(() -> fetchUser(id));
var ordersTask = scope.fork(() -> fetchOrders(id));
// Wait for all subtasks
scope.join();
// Propagate any errors
scope.throwIfFailed();
// Get results
User user = userTask.resultNow();
List<Order> orders = ordersTask.resultNow();
return new Profile(user, orders);
}
// Automatic cleanup: scope closes, all resources released
Key Benefits
1. Explicit Task Lifecycle:
// Parent task controls when children start and stop
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Tasks created and managed within scope
var task1 = scope.fork(() -> work1());
var task2 = scope.fork(() -> work2());
var task3 = scope.fork(() -> work3());
scope.join(); // Parent waits for all children
// When scope exits, all children are guaranteed complete
}
2. Automatic Error Propagation:
// Errors from any child propagate to parent automatically
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
throw new RuntimeException("Task 1 failed");
});
var task2 = scope.fork(() -> result2());
scope.join();
scope.throwIfFailed(); // Throws first error encountered
} catch (Exception e) {
// Handle error from any child task
System.err.println("Child task failed: " + e.getMessage());
}
3. Unified Cancellation:
// Cancel one child, all others are cancelled
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> quickTask());
var task2 = scope.fork(() -> {
Thread.sleep(10000); // Long-running
return result();
});
// If task1 fails, task2 is automatically cancelled
scope.join();
scope.throwIfFailed();
}
4. Resource Safety:
// Guarantees: all children complete before scope closes
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var dbConnection = scope.fork(() -> openDatabase());
var fileHandle = scope.fork(() -> openFile());
scope.join();
scope.throwIfFailed();
// Both resources obtained and ready
Database db = dbConnection.resultNow();
File file = fileHandle.resultNow();
} // Resources automatically cleaned up in reverse order
Task Scope Policies
ShutdownOnFailure (Most Common):
// Cancels all siblings on first failure
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
Thread.sleep(100);
throw new Exception("Failed");
});
var task2 = scope.fork(() -> {
Thread.sleep(5000); // Will be cancelled
return "result";
});
scope.join();
// task2 cancelled due to task1 failure
} catch (Exception e) {
// task1's exception propagated
}
ShutdownOnSuccess:
// Cancels all siblings when one succeeds (race condition)
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
var task1 = scope.fork(() -> {
Thread.sleep(1000);
return "slow-result";
});
var task2 = scope.fork(() -> {
return "fast-result";
});
scope.join();
// Returns first successful result
String result = scope.result(); // "fast-result"
// task1 cancelled automatically
}
Custom Policy:
// Define custom shutdown behavior
public class CustomTaskScope<T> extends StructuredTaskScope<T> {
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
// Custom logic on task completion
if (subtask.hasException()) {
// Log error but continue
System.err.println("Task failed: " + subtask.exception());
}
}
}
try (var scope = new CustomTaskScope<String>()) {
var task1 = scope.fork(() -> mayFail());
var task2 = scope.fork(() -> mayFail());
scope.join(); // Both run regardless of failures
}
Creating and Forking Tasks
Basic Fork:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// fork returns a Subtask handle
Subtask<String> task = scope.fork(() -> {
System.out.println("Running in: " + Thread.currentThread());
return "Result";
});
scope.join();
scope.throwIfFailed();
String result = task.resultNow();
System.out.println("Got: " + result);
}
Multiple Tasks:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<User> userTask = scope.fork(() -> database.getUser("123"));
Subtask<List<Order>> ordersTask = scope.fork(() -> database.getOrders("123"));
Subtask<List<Payment>> paymentsTask = scope.fork(() -> database.getPayments("123"));
scope.join();
scope.throwIfFailed();
User user = userTask.resultNow();
List<Order> orders = ordersTask.resultNow();
List<Payment> payments = paymentsTask.resultNow();
}
Tasks with Parameters:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Parameter from outer scope
String id = "user-123";
Subtask<User> task = scope.fork(() -> {
// Can capture from outer scope (must be effectively final)
return fetchUserData(id);
});
scope.join();
scope.throwIfFailed();
User user = task.resultNow();
}
Real-World Example: User Profile Loading
import java.util.*;
import java.util.concurrent.*;
public class UserProfileService {
private final UserRepository userRepository;
private final OrderService orderService;
private final PreferenceService preferenceService;
private final AnalyticsService analyticsService;
public UserProfileService(
UserRepository userRepository,
OrderService orderService,
PreferenceService preferenceService,
AnalyticsService analyticsService) {
this.userRepository = userRepository;
this.orderService = orderService;
this.preferenceService = preferenceService;
this.analyticsService = analyticsService;
}
/**
* Load complete user profile with all related data in parallel.
* All components load concurrently, with clear error propagation.
*/
public UserProfile loadProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork all data loading tasks in parallel
var userTask = scope.fork(() ->
loadUser(userId));
var ordersTask = scope.fork(() ->
loadOrders(userId));
var preferencesTask = scope.fork(() ->
loadPreferences(userId));
var statisticsTask = scope.fork(() ->
loadStatistics(userId));
// Wait for all to complete
scope.join();
// If any failed, this throws the first exception
scope.throwIfFailed();
// All succeeded - collect results
User user = userTask.resultNow();
List<Order> orders = ordersTask.resultNow();
Preferences preferences = preferencesTask.resultNow();
UserStatistics stats = statisticsTask.resultNow();
return new UserProfile(user, orders, preferences, stats);
} catch (Exception e) {
// Error from any component
System.err.println("Failed to load profile for " + userId +
": " + e.getMessage());
throw e;
}
}
private User loadUser(String userId) {
System.out.println("Loading user: " + userId);
return userRepository.findById(userId);
}
private List<Order> loadOrders(String userId) {
System.out.println("Loading orders: " + userId);
try {
Thread.sleep(100); // Simulate I/O
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return orderService.findByUserId(userId);
}
private Preferences loadPreferences(String userId) {
System.out.println("Loading preferences: " + userId);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return preferenceService.findByUserId(userId);
}
private UserStatistics loadStatistics(String userId) {
System.out.println("Loading statistics: " + userId);
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return analyticsService.getUserStats(userId);
}
// Data classes
record User(String id, String name, String email) {}
record Order(String id, String status, double total) {}
record Preferences(String theme, String language, boolean notifications) {}
record UserStatistics(int totalOrders, double totalSpent, int daysActive) {}
record UserProfile(
User user,
List<Order> orders,
Preferences preferences,
UserStatistics statistics
) {}
// Service interfaces
interface UserRepository {
User findById(String id);
}
interface OrderService {
List<Order> findByUserId(String userId);
}
interface PreferenceService {
Preferences findByUserId(String userId);
}
interface AnalyticsService {
UserStatistics getUserStats(String userId);
}
}
Handling Partial Success
Collecting All Results:
try (var scope = new StructuredTaskScope<Result>()) {
var task1 = scope.fork(() -> operation1());
var task2 = scope.fork(() -> operation2());
var task3 = scope.fork(() -> operation3());
scope.join();
// Collect all results, regardless of exceptions
List<Result> results = scope.stream()
.map(subtask -> subtask.resultNow())
.toList();
}
Filtering Successful Tasks:
try (var scope = new StructuredTaskScope<String>()) {
var tasks = List.of(
scope.fork(() -> tryFetch1()),
scope.fork(() -> tryFetch2()),
scope.fork(() -> tryFetch3())
);
scope.join();
// Get only successful results
List<String> successful = tasks.stream()
.filter(t -> !t.hasException())
.map(Subtask::resultNow)
.toList();
System.out.println("Got " + successful.size() + " successful results");
}
Nested Structured Concurrency
Parent-Child Task Hierarchy:
try (var parentScope = new StructuredTaskScope.ShutdownOnFailure()) {
// Parent task that itself contains subtasks
var parentTask = parentScope.fork(() -> {
try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
var child1 = childScope.fork(() -> childWork1());
var child2 = childScope.fork(() -> childWork2());
childScope.join();
childScope.throwIfFailed();
return "Nested result: " + child1.resultNow() + ", " + child2.resultNow();
}
});
parentScope.join();
parentScope.throwIfFailed();
String result = parentTask.resultNow();
System.out.println(result);
}
Service Composition:
// Parent service coordinates child services
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Each service may internally use structured concurrency
var userServiceTask = scope.fork(() ->
userService.fetchEnrichedUser(userId));
var analyticsServiceTask = scope.fork(() ->
analyticsService.getUserInsights(userId));
var notificationServiceTask = scope.fork(() ->
notificationService.getUserNotifications(userId));
scope.join();
scope.throwIfFailed();
// Each service handled its own internal concurrency
EnrichedUser user = userServiceTask.resultNow();
Insights insights = analyticsServiceTask.resultNow();
Notifications notifications = notificationServiceTask.resultNow();
}
Virtual Threads Integration
Structured Concurrency with Virtual Threads:
// Natural fit - structured scope + virtual thread per task
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var tasks = userIds.stream()
.map(userId -> scope.fork(() -> {
// Each fork runs in a virtual thread
return loadUserData(userId);
}))
.toList();
scope.join();
scope.throwIfFailed();
// All users loaded in parallel on virtual threads
List<UserData> allUsers = tasks.stream()
.map(Subtask::resultNow)
.toList();
}
Automatic Virtual Thread Spawning:
// Each fork() automatically creates a virtual thread
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (String url : urlList) {
scope.fork(() -> {
// Runs in dedicated virtual thread
return httpClient.get(url);
});
}
scope.join();
scope.throwIfFailed();
}
Best Practices
Always Use Try-With-Resources
// Guarantees scope cleanup try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // tasks } // Scope automatically cleaned upCall join() Before Accessing Results
// Bad - results not ready var task = scope.fork(() -> work()); String result = task.resultNow(); // May throw // Good - wait first var task = scope.fork(() -> work()); scope.join(); String result = task.resultNow(); // SafeChoose Appropriate Policy
// ShutdownOnFailure - default, cancel all on first failure try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { } // ShutdownOnSuccess - cancel when first succeeds (race) try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) { }Don't Create Long-Lived Scopes
// Bad - scope held for too long StructuredTaskScope<?> scope = new StructuredTaskScope.ShutdownOnFailure(); // ... some time passes ... scope.fork(() -> task()); // Good - scope exists only as needed try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { scope.fork(() -> task()); }Leverage Scope for Resource Management
// Scope ensures all resources are acquired or all are released try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { var db = scope.fork(() -> acquireDatabase()); var cache = scope.fork(() -> acquireCache()); var queue = scope.fork(() -> acquireQueue()); scope.join(); scope.throwIfFailed(); // All three resources successfully acquired } // All released on scope exit