9.3 Cancellation and Timeouts

Managing task lifetime through cancellation and timeout mechanisms is essential for responsive applications.

Understanding Cancellation Semantics

Cooperative Cancellation:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        System.out.println("Task 1 starting");
        for (int i = 0; i < 1000; i++) {
            // Check cancellation periodically
            if (Thread.interrupted()) {
                System.out.println("Task 1 cancelled at iteration " + i);
                throw new InterruptedException();
            }

            doWork(i);
        }
        return "Task 1 complete";
    });

    var task2 = scope.fork(() -> {
        System.out.println("Task 2 starting");
        Thread.sleep(500);
        throw new RuntimeException("Task 2 failed");
    });

    scope.join();
    // When task2 fails, scope shuts down
    // task1 receives cancellation

} catch (Exception e) {
    System.out.println("Scope cancelled: " + e.getMessage());
}

// Output:
// Task 1 starting
// Task 2 starting
// Task 1 cancelled at iteration ...
// Scope cancelled: Task 2 failed

Immediate Cancellation vs Graceful Shutdown:

public class CancellationModes {
    // Cooperative - respects InterruptedException
    public static String cooperativeTask() throws InterruptedException {
        try {
            System.out.println("Working...");
            Thread.sleep(10000);  // Will be interrupted
            return "Done";
        } catch (InterruptedException e) {
            System.out.println("Interrupted gracefully");
            throw e;  // Propagate to scope
        }
    }

    // Non-cooperative - ignores interruption
    public static String nonCooperativeTask() {
        try {
            System.out.println("Busy working...");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                if (i % 1000000 == 0) {
                    // No cancellation check - will not respond to interruption
                    doIntensiveWork();
                }
            }
            return "Done";
        } catch (Exception e) {
            return "Failed: " + e.getMessage();
        }
    }

    // Example usage
    public static void demonstrateCancellation() throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var cooperative = scope.fork(() -> cooperativeTask());
            var nonCooperative = scope.fork(() -> nonCooperativeTask());

            scope.join();

        } catch (Exception e) {
            System.out.println("One task failed, both cancelled");
            System.out.println("Cooperative responded quickly");
            System.out.println("Non-cooperative may take longer to stop");
        }
    }

    private static void doIntensiveWork() {
        // CPU-bound work
        for (int i = 0; i < 1000000; i++) {
            Math.sqrt(i);
        }
    }
}

Implementing Timeout Patterns

Simple Timeout with Timer:

public <T> T callWithTimeout(Callable<T> callable, Duration timeout) 
        throws Exception {

    long deadlineMillis = System.currentTimeMillis() + timeout.toMillis();

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var mainTask = scope.fork(callable::call);

        // Timeout watchdog in separate thread
        Thread watchdog = new Thread(() -> {
            try {
                long remaining = deadlineMillis - System.currentTimeMillis();
                if (remaining > 0) {
                    Thread.sleep(remaining);
                    System.out.println("Timeout reached, shutting down scope");
                    scope.shutdown();
                }
            } catch (InterruptedException e) {
                // Scope completed before timeout
            }
        });
        watchdog.setDaemon(true);
        watchdog.setName("Watchdog-" + System.nanoTime());
        watchdog.start();

        scope.join();

        if (mainTask.isDone() && !mainTask.hasException()) {
            return mainTask.resultNow();
        } else if (mainTask.hasException()) {
            throw mainTask.exception();
        } else {
            throw new TimeoutException("Operation exceeded timeout: " + timeout);
        }

    } finally {
        // Watchdog will be interrupted when scope exits
    }
}

// Usage
try {
    String result = callWithTimeout(
        () -> slowService.fetchData(),
        Duration.ofSeconds(5)
    );
    System.out.println("Got result: " + result);
} catch (TimeoutException e) {
    System.err.println("Operation timed out: " + e.getMessage());
}

Deadline-Based Timeout:

public <T> T executeBeforeDeadline(Callable<T> operation, Instant deadline) 
        throws Exception {

    Duration remaining = Duration.between(Instant.now(), deadline);
    if (remaining.isNegative()) {
        throw new TimeoutException("Deadline already passed");
    }

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var task = scope.fork(operation::call);

        // Set watchdog for remaining time
        Timer timer = new Timer("DeadlineWatchdog", true);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("Deadline reached, cancelling");
                scope.shutdown();
            }
        }, remaining.toMillis());

        try {
            scope.join();
            timer.cancel();

            if (task.hasException()) {
                throw task.exception();
            }
            return task.resultNow();
        } catch (Exception e) {
            timer.cancel();
            throw e;
        }
    }
}

// Usage
Instant deadline = Instant.now().plus(Duration.ofMinutes(5));
try {
    String result = executeBeforeDeadline(
        () -> processRequest(),
        deadline
    );
} catch (TimeoutException e) {
    System.err.println("Deadline exceeded");
}

Timeout with Fallback:

public <T> T executeWithTimeoutAndFallback(
        Callable<T> primary,
        Callable<T> fallback,
        Duration timeout) throws Exception {

    try {
        return callWithTimeout(primary::call, timeout);
    } catch (TimeoutException e) {
        System.out.println("Primary timed out, trying fallback");
        return fallback.call();
    }
}

// Usage
String data = executeWithTimeoutAndFallback(
    () -> primaryApi.fetch(),           // Fast API
    () -> cacheService.fetch(),         // Slower cache
    Duration.ofSeconds(2)
);

Handling InterruptedException

Proper Interrupt Handling:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task = scope.fork(() -> {
        System.out.println("Task starting");

        try {
            // Long operations with interruption points
            for (int i = 0; i < 100; i++) {
                // Blocking operation that respects interruption
                processItem(i);
                Thread.sleep(100);  // Interruption point
            }
            return "Completed successfully";

        } catch (InterruptedException e) {
            // Restore interrupt status
            Thread.currentThread().interrupt();
            System.out.println("Task interrupted at item " + i);
            throw e;
        }
    });

    scope.join();
    scope.throwIfFailed();
    return task.resultNow();
}

Non-Blocking with Cancellation Check:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task = scope.fork(() -> {
        System.out.println("CPU-intensive task starting");

        try {
            for (int i = 0; i < 1000000000; i++) {
                // Check cancellation periodically for CPU-bound work
                if (i % 100000 == 0) {
                    if (Thread.interrupted()) {
                        System.out.println("Cancellation requested");
                        throw new InterruptedException();
                    }
                }

                // Do expensive calculation
                double result = Math.sqrt(i) * Math.sin(i);
            }
            return "Completed";

        } catch (InterruptedException e) {
            System.out.println("Interrupted during computation");
            throw e;
        }
    });

    scope.join();
    scope.throwIfFailed();
}

Handling Interruption in Library Code:

public class InterruptionAwareUtil {
    // Good - checks for interruption
    public static String processLargeFile(Path file) throws InterruptedException {
        StringBuilder content = new StringBuilder();

        try (var lines = Files.lines(file)) {
            int count = 0;
            for (String line : (Iterable<String>) lines::iterator) {
                if (Thread.interrupted()) {
                    throw new InterruptedException("Processing cancelled");
                }

                content.append(line).append("\n");
                count++;

                if (count % 1000 == 0) {
                    System.out.println("Processed " + count + " lines");
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return content.toString();
    }

    // Bad - ignores interruption
    public static String ignoreInterruption(Path file) throws IOException {
        StringBuilder content = new StringBuilder();

        try (var lines = Files.lines(file)) {
            for (String line : (Iterable<String>) lines::iterator) {
                // No interruption check!
                content.append(line).append("\n");
            }
        }

        return content.toString();
    }
}

// Usage showing difference
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var good = scope.fork(() -> {
        try {
            return InterruptionAwareUtil.processLargeFile(largePath);
        } catch (InterruptedException e) {
            System.out.println("Good method responded to cancellation");
            throw e;
        }
    });

    scope.join();
    scope.throwIfFailed();
}

Cascading Timeouts

Timeout Through Structured Hierarchy:

public class TimeoutPropagation {
    // Parent timeout covers all children
    public String orchestrateWithTimeout() throws Exception {
        Duration parentTimeout = Duration.ofSeconds(10);

        try (var parentScope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task1 = parentScope.fork(() -> {
                // Child inherits parent's time remaining
                Duration childTimeout = parentTimeout.minusSeconds(1);  // Reserve time
                return executeChildTask(childTimeout);
            });

            var task2 = parentScope.fork(() -> {
                Duration childTimeout = parentTimeout.minusSeconds(1);
                return executeAnotherChild(childTimeout);
            });

            parentScope.join();
            parentScope.throwIfFailed();

            return task1.resultNow() + ", " + task2.resultNow();
        }
    }

    private String executeChildTask(Duration timeout) throws Exception {
        try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Child operation with its own timeout
            var result = childScope.fork(() -> {
                Thread.sleep(500);
                return "Child 1 result";
            });

            // Set timeout for this scope
            setTimeoutForScope(childScope, timeout);

            childScope.join();
            childScope.throwIfFailed();
            return result.resultNow();
        }
    }

    private String executeAnotherChild(Duration timeout) throws Exception {
        try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
            var result = childScope.fork(() -> {
                Thread.sleep(1000);
                return "Child 2 result";
            });

            setTimeoutForScope(childScope, timeout);

            childScope.join();
            childScope.throwIfFailed();
            return result.resultNow();
        }
    }

    private void setTimeoutForScope(StructuredTaskScope<?> scope, Duration timeout) {
        Thread watchdog = new Thread(() -> {
            try {
                Thread.sleep(timeout.toMillis());
                scope.shutdown();
            } catch (InterruptedException e) {
                // Completed before timeout
            }
        });
        watchdog.setDaemon(true);
        watchdog.start();
    }
}

Real-World Example: Request Processing with Deadline

import java.time.Instant;
import java.time.Duration;
import java.util.*;

public class RequestProcessor {
    private final AuthenticationService authService;
    private final DataService dataService;
    private final ValidationService validationService;
    private final CacheService cacheService;

    public record Request(String userId, Map<String, String> params) {}
    public record Response(boolean success, Object data, String error) {}

    private static final Duration TOTAL_BUDGET = Duration.ofSeconds(5);

    public Response processRequest(Request request) {
        Instant deadline = Instant.now().plus(TOTAL_BUDGET);

        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Track remaining time
            long startTime = System.currentTimeMillis();

            // Check cache first with short timeout
            var cacheTask = scope.fork(() -> {
                Duration remaining = calculateRemaining(deadline);
                if (remaining.isNegative()) {
                    throw new TimeoutException("Deadline passed before cache check");
                }
                return callWithTimeout(() -> 
                    cacheService.get(request.userId()),
                    Duration.ofMillis(500)
                );
            });

            // Authenticate in parallel with shorter timeout
            var authTask = scope.fork(() -> {
                Duration remaining = calculateRemaining(deadline);
                return callWithTimeout(() -> 
                    authService.authenticate(request.userId()),
                    remaining.minusSeconds(1)
                );
            });

            // Validate input in parallel
            var validationTask = scope.fork(() -> {
                Duration remaining = calculateRemaining(deadline);
                return callWithTimeout(() -> 
                    validationService.validate(request.params()),
                    remaining.minusSeconds(1)
                );
            });

            scope.join();
            scope.throwIfFailed();

            // Check if we still have time
            Duration remaining = calculateRemaining(deadline);
            if (remaining.isNegative()) {
                return new Response(false, null, "Exceeded deadline during parallel tasks");
            }

            // Cache hit
            Object cachedData = cacheTask.resultNow();
            if (cachedData != null) {
                System.out.println("Cache hit!");
                return new Response(true, cachedData, null);
            }

            // Authenticated and validated, fetch data
            String userId = authTask.resultNow();
            Map<String, String> validated = validationTask.resultNow();

            // Final data fetch with remaining time
            Object data = callWithTimeout(() ->
                dataService.fetch(userId, validated),
                remaining.minusMillis(500)  // Reserve 500ms for response building
            );

            return new Response(true, data, null);

        } catch (TimeoutException e) {
            return new Response(false, null, "Request exceeded " + TOTAL_BUDGET);
        } catch (Exception e) {
            return new Response(false, null, "Error: " + e.getMessage());
        }
    }

    private Duration calculateRemaining(Instant deadline) {
        return Duration.between(Instant.now(), deadline);
    }

    private <T> T callWithTimeout(java.util.concurrent.Callable<T> callable, 
                                  Duration timeout) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var task = scope.fork(callable::call);

            Thread watchdog = new Thread(() -> {
                try {
                    Thread.sleep(timeout.toMillis());
                    scope.shutdown();
                } catch (InterruptedException e) {
                    // Completed
                }
            });
            watchdog.setDaemon(true);
            watchdog.start();

            scope.join();

            if (task.hasException()) {
                throw task.exception();
            }
            return task.resultNow();
        }
    }

    // Service interfaces
    interface AuthenticationService {
        String authenticate(String userId) throws Exception;
    }
    interface DataService {
        Object fetch(String userId, Map<String, String> params) throws Exception;
    }
    interface ValidationService {
        Map<String, String> validate(Map<String, String> params) throws Exception;
    }
    interface CacheService {
        Object get(String key) throws Exception;
    }
}

Cancellation Best Practices

  1. Always Check Thread.interrupted():

    // Good - checks for cancellation
    while (condition) {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        doWork();
    }
    
    // Bad - ignores cancellation
    while (condition) {
        doWork();  // May run forever even if cancelled
    }
    
  2. Use Blocking Operations with Interruption:

    // Good - blocking operations respond to interruption
    Thread.sleep(1000);
    blockingQueue.take();
    inputStream.read();
    
    // Avoid - polling (wastes CPU)
    while (!done) {
        if (checkCondition()) break;
        Thread.sleep(10);
    }
    
  3. Restore Interrupt Status:

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();  // Restore!
        throw e;
    }
    
  4. Timeout Granularity:

    // Good - allows cancellation response
    Duration timeout = Duration.ofSeconds(5);
    
    // Bad - too long between checks
    Duration timeout = Duration.ofMinutes(10);
    
  5. Handle Partial Failure:

    // Cancel sibling tasks when one times out
    try {
        scope.join();
    } catch (TimeoutException e) {
        // Other tasks will be cancelled when scope exits
        System.err.println("Cancelling remaining tasks");
    }