9.3 Cancellation and Timeouts
Managing task lifetime through cancellation and timeout mechanisms is essential for responsive applications.
Understanding Cancellation Semantics
Cooperative Cancellation:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
System.out.println("Task 1 starting");
for (int i = 0; i < 1000; i++) {
// Check cancellation periodically
if (Thread.interrupted()) {
System.out.println("Task 1 cancelled at iteration " + i);
throw new InterruptedException();
}
doWork(i);
}
return "Task 1 complete";
});
var task2 = scope.fork(() -> {
System.out.println("Task 2 starting");
Thread.sleep(500);
throw new RuntimeException("Task 2 failed");
});
scope.join();
// When task2 fails, scope shuts down
// task1 receives cancellation
} catch (Exception e) {
System.out.println("Scope cancelled: " + e.getMessage());
}
// Output:
// Task 1 starting
// Task 2 starting
// Task 1 cancelled at iteration ...
// Scope cancelled: Task 2 failed
Immediate Cancellation vs Graceful Shutdown:
public class CancellationModes {
// Cooperative - respects InterruptedException
public static String cooperativeTask() throws InterruptedException {
try {
System.out.println("Working...");
Thread.sleep(10000); // Will be interrupted
return "Done";
} catch (InterruptedException e) {
System.out.println("Interrupted gracefully");
throw e; // Propagate to scope
}
}
// Non-cooperative - ignores interruption
public static String nonCooperativeTask() {
try {
System.out.println("Busy working...");
for (int i = 0; i < Integer.MAX_VALUE; i++) {
if (i % 1000000 == 0) {
// No cancellation check - will not respond to interruption
doIntensiveWork();
}
}
return "Done";
} catch (Exception e) {
return "Failed: " + e.getMessage();
}
}
// Example usage
public static void demonstrateCancellation() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var cooperative = scope.fork(() -> cooperativeTask());
var nonCooperative = scope.fork(() -> nonCooperativeTask());
scope.join();
} catch (Exception e) {
System.out.println("One task failed, both cancelled");
System.out.println("Cooperative responded quickly");
System.out.println("Non-cooperative may take longer to stop");
}
}
private static void doIntensiveWork() {
// CPU-bound work
for (int i = 0; i < 1000000; i++) {
Math.sqrt(i);
}
}
}
Implementing Timeout Patterns
Simple Timeout with Timer:
public <T> T callWithTimeout(Callable<T> callable, Duration timeout)
throws Exception {
long deadlineMillis = System.currentTimeMillis() + timeout.toMillis();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var mainTask = scope.fork(callable::call);
// Timeout watchdog in separate thread
Thread watchdog = new Thread(() -> {
try {
long remaining = deadlineMillis - System.currentTimeMillis();
if (remaining > 0) {
Thread.sleep(remaining);
System.out.println("Timeout reached, shutting down scope");
scope.shutdown();
}
} catch (InterruptedException e) {
// Scope completed before timeout
}
});
watchdog.setDaemon(true);
watchdog.setName("Watchdog-" + System.nanoTime());
watchdog.start();
scope.join();
if (mainTask.isDone() && !mainTask.hasException()) {
return mainTask.resultNow();
} else if (mainTask.hasException()) {
throw mainTask.exception();
} else {
throw new TimeoutException("Operation exceeded timeout: " + timeout);
}
} finally {
// Watchdog will be interrupted when scope exits
}
}
// Usage
try {
String result = callWithTimeout(
() -> slowService.fetchData(),
Duration.ofSeconds(5)
);
System.out.println("Got result: " + result);
} catch (TimeoutException e) {
System.err.println("Operation timed out: " + e.getMessage());
}
Deadline-Based Timeout:
public <T> T executeBeforeDeadline(Callable<T> operation, Instant deadline)
throws Exception {
Duration remaining = Duration.between(Instant.now(), deadline);
if (remaining.isNegative()) {
throw new TimeoutException("Deadline already passed");
}
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(operation::call);
// Set watchdog for remaining time
Timer timer = new Timer("DeadlineWatchdog", true);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Deadline reached, cancelling");
scope.shutdown();
}
}, remaining.toMillis());
try {
scope.join();
timer.cancel();
if (task.hasException()) {
throw task.exception();
}
return task.resultNow();
} catch (Exception e) {
timer.cancel();
throw e;
}
}
}
// Usage
Instant deadline = Instant.now().plus(Duration.ofMinutes(5));
try {
String result = executeBeforeDeadline(
() -> processRequest(),
deadline
);
} catch (TimeoutException e) {
System.err.println("Deadline exceeded");
}
Timeout with Fallback:
public <T> T executeWithTimeoutAndFallback(
Callable<T> primary,
Callable<T> fallback,
Duration timeout) throws Exception {
try {
return callWithTimeout(primary::call, timeout);
} catch (TimeoutException e) {
System.out.println("Primary timed out, trying fallback");
return fallback.call();
}
}
// Usage
String data = executeWithTimeoutAndFallback(
() -> primaryApi.fetch(), // Fast API
() -> cacheService.fetch(), // Slower cache
Duration.ofSeconds(2)
);
Handling InterruptedException
Proper Interrupt Handling:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> {
System.out.println("Task starting");
try {
// Long operations with interruption points
for (int i = 0; i < 100; i++) {
// Blocking operation that respects interruption
processItem(i);
Thread.sleep(100); // Interruption point
}
return "Completed successfully";
} catch (InterruptedException e) {
// Restore interrupt status
Thread.currentThread().interrupt();
System.out.println("Task interrupted at item " + i);
throw e;
}
});
scope.join();
scope.throwIfFailed();
return task.resultNow();
}
Non-Blocking with Cancellation Check:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> {
System.out.println("CPU-intensive task starting");
try {
for (int i = 0; i < 1000000000; i++) {
// Check cancellation periodically for CPU-bound work
if (i % 100000 == 0) {
if (Thread.interrupted()) {
System.out.println("Cancellation requested");
throw new InterruptedException();
}
}
// Do expensive calculation
double result = Math.sqrt(i) * Math.sin(i);
}
return "Completed";
} catch (InterruptedException e) {
System.out.println("Interrupted during computation");
throw e;
}
});
scope.join();
scope.throwIfFailed();
}
Handling Interruption in Library Code:
public class InterruptionAwareUtil {
// Good - checks for interruption
public static String processLargeFile(Path file) throws InterruptedException {
StringBuilder content = new StringBuilder();
try (var lines = Files.lines(file)) {
int count = 0;
for (String line : (Iterable<String>) lines::iterator) {
if (Thread.interrupted()) {
throw new InterruptedException("Processing cancelled");
}
content.append(line).append("\n");
count++;
if (count % 1000 == 0) {
System.out.println("Processed " + count + " lines");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return content.toString();
}
// Bad - ignores interruption
public static String ignoreInterruption(Path file) throws IOException {
StringBuilder content = new StringBuilder();
try (var lines = Files.lines(file)) {
for (String line : (Iterable<String>) lines::iterator) {
// No interruption check!
content.append(line).append("\n");
}
}
return content.toString();
}
}
// Usage showing difference
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var good = scope.fork(() -> {
try {
return InterruptionAwareUtil.processLargeFile(largePath);
} catch (InterruptedException e) {
System.out.println("Good method responded to cancellation");
throw e;
}
});
scope.join();
scope.throwIfFailed();
}
Cascading Timeouts
Timeout Through Structured Hierarchy:
public class TimeoutPropagation {
// Parent timeout covers all children
public String orchestrateWithTimeout() throws Exception {
Duration parentTimeout = Duration.ofSeconds(10);
try (var parentScope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = parentScope.fork(() -> {
// Child inherits parent's time remaining
Duration childTimeout = parentTimeout.minusSeconds(1); // Reserve time
return executeChildTask(childTimeout);
});
var task2 = parentScope.fork(() -> {
Duration childTimeout = parentTimeout.minusSeconds(1);
return executeAnotherChild(childTimeout);
});
parentScope.join();
parentScope.throwIfFailed();
return task1.resultNow() + ", " + task2.resultNow();
}
}
private String executeChildTask(Duration timeout) throws Exception {
try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
// Child operation with its own timeout
var result = childScope.fork(() -> {
Thread.sleep(500);
return "Child 1 result";
});
// Set timeout for this scope
setTimeoutForScope(childScope, timeout);
childScope.join();
childScope.throwIfFailed();
return result.resultNow();
}
}
private String executeAnotherChild(Duration timeout) throws Exception {
try (var childScope = new StructuredTaskScope.ShutdownOnFailure()) {
var result = childScope.fork(() -> {
Thread.sleep(1000);
return "Child 2 result";
});
setTimeoutForScope(childScope, timeout);
childScope.join();
childScope.throwIfFailed();
return result.resultNow();
}
}
private void setTimeoutForScope(StructuredTaskScope<?> scope, Duration timeout) {
Thread watchdog = new Thread(() -> {
try {
Thread.sleep(timeout.toMillis());
scope.shutdown();
} catch (InterruptedException e) {
// Completed before timeout
}
});
watchdog.setDaemon(true);
watchdog.start();
}
}
Real-World Example: Request Processing with Deadline
import java.time.Instant;
import java.time.Duration;
import java.util.*;
public class RequestProcessor {
private final AuthenticationService authService;
private final DataService dataService;
private final ValidationService validationService;
private final CacheService cacheService;
public record Request(String userId, Map<String, String> params) {}
public record Response(boolean success, Object data, String error) {}
private static final Duration TOTAL_BUDGET = Duration.ofSeconds(5);
public Response processRequest(Request request) {
Instant deadline = Instant.now().plus(TOTAL_BUDGET);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Track remaining time
long startTime = System.currentTimeMillis();
// Check cache first with short timeout
var cacheTask = scope.fork(() -> {
Duration remaining = calculateRemaining(deadline);
if (remaining.isNegative()) {
throw new TimeoutException("Deadline passed before cache check");
}
return callWithTimeout(() ->
cacheService.get(request.userId()),
Duration.ofMillis(500)
);
});
// Authenticate in parallel with shorter timeout
var authTask = scope.fork(() -> {
Duration remaining = calculateRemaining(deadline);
return callWithTimeout(() ->
authService.authenticate(request.userId()),
remaining.minusSeconds(1)
);
});
// Validate input in parallel
var validationTask = scope.fork(() -> {
Duration remaining = calculateRemaining(deadline);
return callWithTimeout(() ->
validationService.validate(request.params()),
remaining.minusSeconds(1)
);
});
scope.join();
scope.throwIfFailed();
// Check if we still have time
Duration remaining = calculateRemaining(deadline);
if (remaining.isNegative()) {
return new Response(false, null, "Exceeded deadline during parallel tasks");
}
// Cache hit
Object cachedData = cacheTask.resultNow();
if (cachedData != null) {
System.out.println("Cache hit!");
return new Response(true, cachedData, null);
}
// Authenticated and validated, fetch data
String userId = authTask.resultNow();
Map<String, String> validated = validationTask.resultNow();
// Final data fetch with remaining time
Object data = callWithTimeout(() ->
dataService.fetch(userId, validated),
remaining.minusMillis(500) // Reserve 500ms for response building
);
return new Response(true, data, null);
} catch (TimeoutException e) {
return new Response(false, null, "Request exceeded " + TOTAL_BUDGET);
} catch (Exception e) {
return new Response(false, null, "Error: " + e.getMessage());
}
}
private Duration calculateRemaining(Instant deadline) {
return Duration.between(Instant.now(), deadline);
}
private <T> T callWithTimeout(java.util.concurrent.Callable<T> callable,
Duration timeout) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(callable::call);
Thread watchdog = new Thread(() -> {
try {
Thread.sleep(timeout.toMillis());
scope.shutdown();
} catch (InterruptedException e) {
// Completed
}
});
watchdog.setDaemon(true);
watchdog.start();
scope.join();
if (task.hasException()) {
throw task.exception();
}
return task.resultNow();
}
}
// Service interfaces
interface AuthenticationService {
String authenticate(String userId) throws Exception;
}
interface DataService {
Object fetch(String userId, Map<String, String> params) throws Exception;
}
interface ValidationService {
Map<String, String> validate(Map<String, String> params) throws Exception;
}
interface CacheService {
Object get(String key) throws Exception;
}
}
Cancellation Best Practices
Always Check Thread.interrupted():
// Good - checks for cancellation while (condition) { if (Thread.interrupted()) { throw new InterruptedException(); } doWork(); } // Bad - ignores cancellation while (condition) { doWork(); // May run forever even if cancelled }Use Blocking Operations with Interruption:
// Good - blocking operations respond to interruption Thread.sleep(1000); blockingQueue.take(); inputStream.read(); // Avoid - polling (wastes CPU) while (!done) { if (checkCondition()) break; Thread.sleep(10); }Restore Interrupt Status:
try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Restore! throw e; }Timeout Granularity:
// Good - allows cancellation response Duration timeout = Duration.ofSeconds(5); // Bad - too long between checks Duration timeout = Duration.ofMinutes(10);Handle Partial Failure:
// Cancel sibling tasks when one times out try { scope.join(); } catch (TimeoutException e) { // Other tasks will be cancelled when scope exits System.err.println("Cancelling remaining tasks"); }