9.1 Structured Concurrency Fundamentals

Structured concurrency treats a group of related tasks as a single unit of work, establishing clear parent-child relationships, error propagation, and cancellation semantics. This is a fundamental shift from unstructured concurrency.

What is Structured Concurrency?

Traditional Unstructured Concurrency:

// Unstructured - no clear lifetime
ExecutorService executor = Executors.newFixedThreadPool(10);

// Submit tasks but no guarantee of completion
Future<User> userFuture = executor.submit(() -> fetchUser(id));
Future<List<Order>> ordersFuture = executor.submit(() -> fetchOrders(id));

// Must manually coordinate completion
User user = userFuture.get();  // May timeout, may throw
List<Order> orders = ordersFuture.get();  // Independent error handling

// May forget to shut down
// executor.shutdown();  // Easy to forget!

Structured Concurrency:

// Structured - clear lifetime and error handling
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Create subtasks within scope
    var userTask = scope.fork(() -> fetchUser(id));
    var ordersTask = scope.fork(() -> fetchOrders(id));

    // Wait for all subtasks
    scope.join();

    // Propagate any errors
    scope.throwIfFailed();

    // Get results
    User user = userTask.resultNow();
    List<Order> orders = ordersTask.resultNow();

    return new Profile(user, orders);
}
// Automatic cleanup: scope closes, all resources released

Key Benefits

1. Explicit Task Lifecycle:

// Parent task controls when children start and stop
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Tasks created and managed within scope
    var task1 = scope.fork(() -> work1());
    var task2 = scope.fork(() -> work2());
    var task3 = scope.fork(() -> work3());

    scope.join();  // Parent waits for all children
    // When scope exits, all children are guaranteed complete
}

2. Automatic Error Propagation:

// Errors from any child propagate to parent automatically
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        throw new RuntimeException("Task 1 failed");
    });
    var task2 = scope.fork(() -> result2());

    scope.join();
    scope.throwIfFailed();  // Throws first error encountered

} catch (Exception e) {
    // Handle error from any child task
    System.err.println("Child task failed: " + e.getMessage());
}

3. Unified Cancellation:

// Cancel one child, all others are cancelled
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> quickTask());
    var task2 = scope.fork(() -> {
        Thread.sleep(10000);  // Long-running
        return result();
    });

    // If task1 fails, task2 is automatically cancelled
    scope.join();
    scope.throwIfFailed();
}

4. Resource Safety:

// Guarantees: all children complete before scope closes
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var dbConnection = scope.fork(() -> openDatabase());
    var fileHandle = scope.fork(() -> openFile());

    scope.join();
    scope.throwIfFailed();

    // Both resources obtained and ready
    Database db = dbConnection.resultNow();
    File file = fileHandle.resultNow();

} // Resources automatically cleaned up in reverse order

Task Scope Policies

ShutdownOnFailure (Most Common):

// Cancels all siblings on first failure
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        Thread.sleep(100);
        throw new Exception("Failed");
    });
    var task2 = scope.fork(() -> {
        Thread.sleep(5000);  // Will be cancelled
        return "result";
    });

    scope.join();
    // task2 cancelled due to task1 failure

} catch (Exception e) {
    // task1's exception propagated
}

ShutdownOnSuccess:

// Cancels all siblings when one succeeds (race condition)
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    var task1 = scope.fork(() -> {
        Thread.sleep(1000);
        return "slow-result";
    });
    var task2 = scope.fork(() -> {
        return "fast-result";
    });

    scope.join();

    // Returns first successful result
    String result = scope.result();  // "fast-result"
    // task1 cancelled automatically
}

Custom Policy:

// Define custom shutdown behavior
public class CustomTaskScope<T> extends StructuredTaskScope<T> {
    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        // Custom logic on task completion
        if (subtask.hasException()) {
            // Log error but continue
            System.err.println("Task failed: " + subtask.exception());
        }
    }
}

try (var scope = new CustomTaskScope<String>()) {
    var task1 = scope.fork(() -> mayFail());
    var task2 = scope.fork(() -> mayFail());

    scope.join();  // Both run regardless of failures
}

Creating and Forking Tasks

Basic Fork:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // fork returns a Subtask handle
    Subtask<String> task = scope.fork(() -> {
        System.out.println("Running in: " + Thread.currentThread());
        return "Result";
    });

    scope.join();
    scope.throwIfFailed();

    String result = task.resultNow();
    System.out.println("Got: " + result);
}

Multiple Tasks:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<User> userTask = scope.fork(() -> database.getUser("123"));
    Subtask<List<Order>> ordersTask = scope.fork(() -> database.getOrders("123"));
    Subtask<List<Payment>> paymentsTask = scope.fork(() -> database.getPayments("123"));

    scope.join();
    scope.throwIfFailed();

    User user = userTask.resultNow();
    List<Order> orders = ordersTask.resultNow();
    List<Payment> payments = paymentsTask.resultNow();
}

Tasks with Parameters:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Parameter from outer scope
    String id = "user-123";

    Subtask<User> task = scope.fork(() -> {
        // Can capture from outer scope (must be effectively final)
        return fetchUserData(id);
    });

    scope.join();
    scope.throwIfFailed();

    User user = task.resultNow();
}

Real-World Example: User Profile Loading

import java.util.*;
import java.util.concurrent.*;

public class UserProfileService {
    private final UserRepository userRepository;
    private final OrderService orderService;
    private final PreferenceService preferenceService;
    private final AnalyticsService analyticsService;

    public UserProfileService(
            UserRepository userRepository,
            OrderService orderService,
            PreferenceService preferenceService,
            AnalyticsService analyticsService) {
        this.userRepository = userRepository;
        this.orderService = orderService;
        this.preferenceService = preferenceService;
        this.analyticsService = analyticsService;
    }

    /**
     * Load complete user profile with all related data in parallel.
     * All components load concurrently, with clear error propagation.
     */
    public UserProfile loadProfile(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Fork all data loading tasks in parallel
            var userTask = scope.fork(() -> 
                loadUser(userId));

            var ordersTask = scope.fork(() -> 
                loadOrders(userId));

            var preferencesTask = scope.fork(() -> 
                loadPreferences(userId));

            var statisticsTask = scope.fork(() -> 
                loadStatistics(userId));

            // Wait for all to complete
            scope.join();

            // If any failed, this throws the first exception
            scope.throwIfFailed();

            // All succeeded - collect results
            User user = userTask.resultNow();
            List<Order> orders = ordersTask.resultNow();
            Preferences preferences = preferencesTask.resultNow();
            UserStatistics stats = statisticsTask.resultNow();

            return new UserProfile(user, orders, preferences, stats);

        } catch (Exception e) {
            // Error from any component
            System.err.println("Failed to load profile for " + userId + 
                ": " + e.getMessage());
            throw e;
        }
    }

    private User loadUser(String userId) {
        System.out.println("Loading user: " + userId);
        return userRepository.findById(userId);
    }

    private List<Order> loadOrders(String userId) {
        System.out.println("Loading orders: " + userId);
        try {
            Thread.sleep(100);  // Simulate I/O
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return orderService.findByUserId(userId);
    }

    private Preferences loadPreferences(String userId) {
        System.out.println("Loading preferences: " + userId);
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return preferenceService.findByUserId(userId);
    }

    private UserStatistics loadStatistics(String userId) {
        System.out.println("Loading statistics: " + userId);
        try {
            Thread.sleep(150);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return analyticsService.getUserStats(userId);
    }

    // Data classes
    record User(String id, String name, String email) {}
    record Order(String id, String status, double total) {}
    record Preferences(String theme, String language, boolean notifications) {}
    record UserStatistics(int totalOrders, double totalSpent, int daysActive) {}

    record UserProfile(
        User user,
        List<Order> orders,
        Preferences preferences,
        UserStatistics statistics
    ) {}

    // Service interfaces
    interface UserRepository {
        User findById(String id);
    }

    interface OrderService {
        List<Order> findByUserId(String userId);
    }

    interface PreferenceService {
        Preferences findByUserId(String userId);
    }

    interface AnalyticsService {
        UserStatistics getUserStats(String userId);
    }
}

Handling Partial Success

Collecting All Results:

try (var scope = new StructuredTaskScope<Result>()) {
    var task1 = scope.fork(() -> operation1());
    var task2 = scope.fork(() -> operation2());
    var task3 = scope.fork(() -> operation3());

    scope.join();

    // Collect all results, regardless of exceptions
    List<Result> results = scope.stream()
        .map(subtask -> subtask.resultNow())
        .toList();
}

Filtering Successful Tasks:

try (var scope = new StructuredTaskScope<String>()) {
    var tasks = List.of(
        scope.fork(() -> tryFetch1()),
        scope.fork(() -> tryFetch2()),
        scope.fork(() -> tryFetch3())
    );

    scope.join();

    // Get only successful results
    List<String> successful = tasks.stream()
        .filter(t -> !t.hasException())
        .map(Subtask::resultNow)
        .toList();

    System.out.println("Got " + successful.size() + " successful results");
}

Nested Structured Concurrency

Parent-Child Task Hierarchy:

try (var parentScope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Parent task that itself contains subtasks
    var parentTask = parentScope.fork(() -> {
        try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
            var child1 = childScope.fork(() -> childWork1());
            var child2 = childScope.fork(() -> childWork2());

            childScope.join();
            childScope.throwIfFailed();

            return "Nested result: " + child1.resultNow() + ", " + child2.resultNow();
        }
    });

    parentScope.join();
    parentScope.throwIfFailed();

    String result = parentTask.resultNow();
    System.out.println(result);
}

Service Composition:

// Parent service coordinates child services
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Each service may internally use structured concurrency
    var userServiceTask = scope.fork(() -> 
        userService.fetchEnrichedUser(userId));

    var analyticsServiceTask = scope.fork(() -> 
        analyticsService.getUserInsights(userId));

    var notificationServiceTask = scope.fork(() -> 
        notificationService.getUserNotifications(userId));

    scope.join();
    scope.throwIfFailed();

    // Each service handled its own internal concurrency
    EnrichedUser user = userServiceTask.resultNow();
    Insights insights = analyticsServiceTask.resultNow();
    Notifications notifications = notificationServiceTask.resultNow();
}

Virtual Threads Integration

Structured Concurrency with Virtual Threads:

// Natural fit - structured scope + virtual thread per task
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var tasks = userIds.stream()
        .map(userId -> scope.fork(() -> {
            // Each fork runs in a virtual thread
            return loadUserData(userId);
        }))
        .toList();

    scope.join();
    scope.throwIfFailed();

    // All users loaded in parallel on virtual threads
    List<UserData> allUsers = tasks.stream()
        .map(Subtask::resultNow)
        .toList();
}

Automatic Virtual Thread Spawning:

// Each fork() automatically creates a virtual thread
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    for (String url : urlList) {
        scope.fork(() -> {
            // Runs in dedicated virtual thread
            return httpClient.get(url);
        });
    }

    scope.join();
    scope.throwIfFailed();
}

Best Practices

  1. Always Use Try-With-Resources

    // Guarantees scope cleanup
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // tasks
    } // Scope automatically cleaned up
    
  2. Call join() Before Accessing Results

    // Bad - results not ready
    var task = scope.fork(() -> work());
    String result = task.resultNow();  // May throw
    
    // Good - wait first
    var task = scope.fork(() -> work());
    scope.join();
    String result = task.resultNow();  // Safe
    
  3. Choose Appropriate Policy

    // ShutdownOnFailure - default, cancel all on first failure
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { }
    
    // ShutdownOnSuccess - cancel when first succeeds (race)
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) { }
    
  4. Don't Create Long-Lived Scopes

    // Bad - scope held for too long
    StructuredTaskScope<?> scope = new StructuredTaskScope.ShutdownOnFailure();
    // ... some time passes ...
    scope.fork(() -> task());
    
    // Good - scope exists only as needed
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        scope.fork(() -> task());
    }
    
  5. Leverage Scope for Resource Management

    // Scope ensures all resources are acquired or all are released
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var db = scope.fork(() -> acquireDatabase());
        var cache = scope.fork(() -> acquireCache());
        var queue = scope.fork(() -> acquireQueue());
    
        scope.join();
        scope.throwIfFailed();
        // All three resources successfully acquired
    }
    // All released on scope exit