17.3 Asynchronous Operations

Master non-blocking HTTP operations with CompletableFuture.

sendAsync Basics

Async Request Execution:

import java.net.http.*;
import java.util.concurrent.CompletableFuture;

/**
 * Asynchronous HTTP operations
 */
public class AsyncHTTPOperations {

    /**
     * Simple async GET
     */
    public static CompletableFuture<String> asyncGet(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request, 
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body);
    }

    /**
     * Async request with detailed response
     */
    public static CompletableFuture<HttpResponse<String>> asyncGetFull(
            String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString());
    }

    /**
     * Async POST
     */
    public static CompletableFuture<String> asyncPost(String url, 
                                                      String body) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .POST(HttpRequest.BodyPublishers.ofString(body))
            .header("Content-Type", "application/json")
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body);
    }

    /**
     * Parallel async requests
     */
    public static CompletableFuture<List<String>> asyncGetMultiple(
            List<String> urls) {
        List<CompletableFuture<String>> futures = urls.stream()
            .map(AsyncHTTPOperations::asyncGet)
            .toList();

        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }
}

CompletableFuture Chaining

Building Async Pipelines:

/**
 * Chain async operations
 */
public class AsyncChaining {

    /**
     * Transform response
     */
    public static CompletableFuture<String> fetchAndTransform(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .thenApply(String::toUpperCase);
    }

    /**
     * Dependent requests (sequential)
     */
    public static CompletableFuture<String> dependentRequests(
            String url1, String url2) {
        HttpClient client = HttpClient.newHttpClient();

        return client.sendAsync(
                HttpRequest.newBuilder(URI.create(url1)).GET().build(),
                HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .thenCompose(firstResult -> {
                // Use first result to build second request
                String url = url2 + "?id=" + firstResult;

                return client.sendAsync(
                    HttpRequest.newBuilder(URI.create(url)).GET().build(),
                    HttpResponse.BodyHandlers.ofString())
                    .thenApply(HttpResponse::body);
            });
    }

    /**
     * Multiple operations on response
     */
    public static CompletableFuture<ProcessedData> fetchAndProcess(
            String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .thenApply(AsyncChaining::parseData)
            .thenApply(AsyncChaining::validateData)
            .thenApply(AsyncChaining::enrichData);
    }

    /**
     * Recover from failure
     */
    public static CompletableFuture<String> withFallback(String url,
                                                         String fallback) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .exceptionally(throwable -> {
                System.err.println("Failed to fetch: " + throwable.getMessage());
                return fallback;
            });
    }

    private static ProcessedData parseData(String json) {
        return new ProcessedData(json);
    }

    private static ProcessedData validateData(ProcessedData data) {
        // Validate
        return data;
    }

    private static ProcessedData enrichData(ProcessedData data) {
        // Enrich with additional information
        return data;
    }

    static class ProcessedData {
        String data;

        ProcessedData(String data) {
            this.data = data;
        }
    }
}

Error Handling

Handling Async Errors:

/**
 * Error handling in async operations
 */
public class AsyncErrorHandling {

    /**
     * Basic exception handling
     */
    public static CompletableFuture<String> withExceptionHandling(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .exceptionally(throwable -> {
                if (throwable instanceof HttpTimeoutException) {
                    return "Timeout error";
                } else if (throwable instanceof HttpConnectException) {
                    return "Connection error";
                } else {
                    return "Unknown error: " + throwable.getMessage();
                }
            });
    }

    /**
     * Handle HTTP error status codes
     */
    public static CompletableFuture<String> handleStatusCodes(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(response -> {
                int status = response.statusCode();

                if (status >= 200 && status < 300) {
                    return response.body();
                } else if (status >= 400 && status < 500) {
                    throw new IllegalArgumentException(
                        "Client error: " + status);
                } else if (status >= 500) {
                    throw new RuntimeException(
                        "Server error: " + status);
                }

                return response.body();
            });
    }

    /**
     * Comprehensive error handling
     */
    public static CompletableFuture<String> robustFetch(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .timeout(java.time.Duration.ofSeconds(10))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(response -> {
                if (response.statusCode() != 200) {
                    throw new RuntimeException(
                        "HTTP " + response.statusCode());
                }
                return response.body();
            })
            .exceptionally(throwable -> {
                if (throwable.getCause() instanceof HttpTimeoutException) {
                    return "Request timed out";
                } else if (throwable.getCause() instanceof 
                           java.net.ConnectException) {
                    return "Could not connect to server";
                } else {
                    return "Error: " + throwable.getMessage();
                }
            });
    }
}

Retry Patterns

Resilient Async Operations:

/**
 * Retry strategies for async operations
 */
public class RetryStrategies {

    /**
     * Simple retry with exponential backoff
     */
    public static CompletableFuture<String> retryWithBackoff(
            String url, int maxRetries) {
        return retryWithBackoff(url, maxRetries, 1, 
            java.time.Duration.ofSeconds(1));
    }

    /**
     * Retry with configurable backoff
     */
    public static CompletableFuture<String> retryWithBackoff(
            String url, int maxRetries, int backoffMultiplier,
            java.time.Duration initialDelay) {
        HttpClient client = HttpClient.newHttpClient();

        return retryInternal(client, url, maxRetries, 1,
            backoffMultiplier, initialDelay.toMillis());
    }

    private static CompletableFuture<String> retryInternal(
            HttpClient client, String url, int maxRetries, 
            int attempt, int backoffMultiplier, long delayMs) {

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(response -> {
                if (response.statusCode() >= 200 && 
                    response.statusCode() < 300) {
                    return CompletableFuture.completedFuture(response.body());
                } else if (response.statusCode() >= 500 && attempt < maxRetries) {
                    // Retry on server error
                    return delay(delayMs * backoffMultiplier)
                        .thenCompose(v -> retryInternal(
                            client, url, maxRetries, attempt + 1,
                            backoffMultiplier, delayMs));
                } else {
                    return CompletableFuture.failedFuture(
                        new RuntimeException("HTTP " + 
                            response.statusCode()));
                }
            })
            .thenCompose(f -> f)
            .exceptionally(throwable -> {
                if (attempt < maxRetries) {
                    return delay(delayMs * backoffMultiplier)
                        .thenCompose(v -> retryInternal(
                            client, url, maxRetries, attempt + 1,
                            backoffMultiplier, delayMs))
                        .join();
                } else {
                    throw new RuntimeException(
                        "All retries exhausted", throwable);
                }
            });
    }

    private static CompletableFuture<Void> delay(long milliseconds) {
        return CompletableFuture.delayedExecutor(
            milliseconds, java.util.concurrent.TimeUnit.MILLISECONDS)
            .submit(() -> null);
    }

    /**
     * Retry with circuit breaker
     */
    public static class CircuitBreaker {
        private final AtomicInteger failures = new AtomicInteger(0);
        private final int threshold;
        private final long resetTimeMs;
        private long lastFailureTime = 0;

        public CircuitBreaker(int failureThreshold, 
                            long resetTimeMs) {
            this.threshold = failureThreshold;
            this.resetTimeMs = resetTimeMs;
        }

        public <T> CompletableFuture<T> execute(
                CompletableFuture<T> operation) {

            // Check if circuit is open
            if (failures.get() >= threshold) {
                long timeSinceLastFailure = 
                    System.currentTimeMillis() - lastFailureTime;

                if (timeSinceLastFailure < resetTimeMs) {
                    return CompletableFuture.failedFuture(
                        new RuntimeException("Circuit breaker is open"));
                } else {
                    // Reset
                    failures.set(0);
                }
            }

            return operation
                .whenComplete((result, exception) -> {
                    if (exception != null) {
                        failures.incrementAndGet();
                        lastFailureTime = System.currentTimeMillis();
                    } else {
                        failures.set(0);
                    }
                });
        }
    }
}

Concurrent Requests

Handling Multiple Async Operations:

/**
 * Concurrent async request patterns
 */
public class ConcurrentRequests {

    /**
     * Fetch all URLs, wait for all
     */
    public static CompletableFuture<List<String>> fetchAll(
            List<String> urls) {
        List<CompletableFuture<String>> futures = urls.stream()
            .map(ConcurrentRequests::fetchAsync)
            .toList();

        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }

    /**
     * Fetch all URLs, return first success
     */
    public static CompletableFuture<String> fetchFirstSuccess(
            List<String> urls) {
        List<CompletableFuture<String>> futures = urls.stream()
            .map(ConcurrentRequests::fetchAsync)
            .toList();

        return CompletableFuture.anyOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(result -> (String) result);
    }

    /**
     * Rate-limited concurrent requests
     */
    public static CompletableFuture<List<String>> fetchWithRateLimit(
            List<String> urls, int concurrency) {

        Semaphore semaphore = new Semaphore(concurrency);

        List<CompletableFuture<String>> futures = urls.stream()
            .map(url -> CompletableFuture.supplyAsync(() -> {
                try {
                    semaphore.acquire();
                    return fetchSync(url);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } finally {
                    semaphore.release();
                }
            }))
            .toList();

        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }

    private static CompletableFuture<String> fetchAsync(String url) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body);
    }

    private static String fetchSync(String url) throws Exception {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        HttpResponse<String> response = client.send(request,
            HttpResponse.BodyHandlers.ofString());

        return response.body();
    }
}

Timeouts in Async

Async Timeout Handling:

/**
 * Timeout patterns in async operations
 */
public class AsyncTimeouts {

    /**
     * Request timeout
     */
    public static CompletableFuture<String> withRequestTimeout(
            String url, int timeoutSeconds) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .timeout(java.time.Duration.ofSeconds(timeoutSeconds))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body);
    }

    /**
     * Future timeout with orTimeout
     */
    public static CompletableFuture<String> withOrTimeout(
            String url, int timeoutSeconds) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .orTimeout(timeoutSeconds, 
                java.util.concurrent.TimeUnit.SECONDS);
    }

    /**
     * Complete with default on timeout
     */
    public static CompletableFuture<String> withCompleteOnTimeout(
            String url, String defaultValue, int timeoutSeconds) {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
            .GET()
            .build();

        return client.sendAsync(request,
            HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .completeOnTimeout(defaultValue, 
                timeoutSeconds, 
                java.util.concurrent.TimeUnit.SECONDS);
    }
}

Cancellation

Cancelling Async Operations:

/**
 * Cancellation patterns
 */
public class AsyncCancellation {

    /**
     * Cancel async operation
     */
    public static void demonstrateCancellation() throws Exception {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(
            URI.create("https://slow-server.example.com/large-file"))
            .GET()
            .build();

        CompletableFuture<String> future = 
            client.sendAsync(request,
                HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body);

        // Start operation
        Thread.sleep(100);

        // Cancel it
        boolean cancelled = future.cancel(true);
        System.out.println("Cancelled: " + cancelled);
    }

    /**
     * Cancel with cleanup
     */
    public static void cancellationWithCleanup() throws Exception {
        HttpClient client = HttpClient.newHttpClient();

        HttpRequest request = HttpRequest.newBuilder(
            URI.create("https://example.com/data"))
            .GET()
            .build();

        CompletableFuture<String> future = 
            client.sendAsync(request,
                HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body)
                .whenComplete((result, exception) -> {
                    if (exception != null) {
                        System.out.println("Operation cancelled or failed");
                    }
                });

        // Cancel with cleanup
        future.cancel(true);
    }
}

Best Practices

1. Use sendAsync for Non-Blocking I/O:

CompletableFuture<String> future = client.sendAsync(request,
    HttpResponse.BodyHandlers.ofString())
    .thenApply(HttpResponse::body);

2. Chain Operations with thenApply/thenCompose:

client.sendAsync(request1, ...)
    .thenApply(HttpResponse::body)
    .thenCompose(result -> client.sendAsync(request2, ...));

3. Handle Errors Properly:

client.sendAsync(request, ...)
    .exceptionally(throwable -> "fallback value");

4. Implement Retries with Exponential Backoff:

// Use retry strategy for transient failures
retryWithBackoff(url, maxAttempts);

5. Apply Timeouts:

client.sendAsync(request, ...)
    .orTimeout(10, TimeUnit.SECONDS);

These async patterns enable building responsive, non-blocking HTTP applications in Java.