17.3 Asynchronous Operations
Master non-blocking HTTP operations with CompletableFuture.
sendAsync Basics
Async Request Execution:
import java.net.http.*;
import java.util.concurrent.CompletableFuture;
/**
* Asynchronous HTTP operations
*/
public class AsyncHTTPOperations {
/**
* Simple async GET
*/
public static CompletableFuture<String> asyncGet(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
/**
* Async request with detailed response
*/
public static CompletableFuture<HttpResponse<String>> asyncGetFull(
String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString());
}
/**
* Async POST
*/
public static CompletableFuture<String> asyncPost(String url,
String body) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.POST(HttpRequest.BodyPublishers.ofString(body))
.header("Content-Type", "application/json")
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
/**
* Parallel async requests
*/
public static CompletableFuture<List<String>> asyncGetMultiple(
List<String> urls) {
List<CompletableFuture<String>> futures = urls.stream()
.map(AsyncHTTPOperations::asyncGet)
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
}
CompletableFuture Chaining
Building Async Pipelines:
/**
* Chain async operations
*/
public class AsyncChaining {
/**
* Transform response
*/
public static CompletableFuture<String> fetchAndTransform(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenApply(String::toUpperCase);
}
/**
* Dependent requests (sequential)
*/
public static CompletableFuture<String> dependentRequests(
String url1, String url2) {
HttpClient client = HttpClient.newHttpClient();
return client.sendAsync(
HttpRequest.newBuilder(URI.create(url1)).GET().build(),
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenCompose(firstResult -> {
// Use first result to build second request
String url = url2 + "?id=" + firstResult;
return client.sendAsync(
HttpRequest.newBuilder(URI.create(url)).GET().build(),
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
});
}
/**
* Multiple operations on response
*/
public static CompletableFuture<ProcessedData> fetchAndProcess(
String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenApply(AsyncChaining::parseData)
.thenApply(AsyncChaining::validateData)
.thenApply(AsyncChaining::enrichData);
}
/**
* Recover from failure
*/
public static CompletableFuture<String> withFallback(String url,
String fallback) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.exceptionally(throwable -> {
System.err.println("Failed to fetch: " + throwable.getMessage());
return fallback;
});
}
private static ProcessedData parseData(String json) {
return new ProcessedData(json);
}
private static ProcessedData validateData(ProcessedData data) {
// Validate
return data;
}
private static ProcessedData enrichData(ProcessedData data) {
// Enrich with additional information
return data;
}
static class ProcessedData {
String data;
ProcessedData(String data) {
this.data = data;
}
}
}
Error Handling
Handling Async Errors:
/**
* Error handling in async operations
*/
public class AsyncErrorHandling {
/**
* Basic exception handling
*/
public static CompletableFuture<String> withExceptionHandling(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.exceptionally(throwable -> {
if (throwable instanceof HttpTimeoutException) {
return "Timeout error";
} else if (throwable instanceof HttpConnectException) {
return "Connection error";
} else {
return "Unknown error: " + throwable.getMessage();
}
});
}
/**
* Handle HTTP error status codes
*/
public static CompletableFuture<String> handleStatusCodes(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(response -> {
int status = response.statusCode();
if (status >= 200 && status < 300) {
return response.body();
} else if (status >= 400 && status < 500) {
throw new IllegalArgumentException(
"Client error: " + status);
} else if (status >= 500) {
throw new RuntimeException(
"Server error: " + status);
}
return response.body();
});
}
/**
* Comprehensive error handling
*/
public static CompletableFuture<String> robustFetch(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.timeout(java.time.Duration.ofSeconds(10))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(response -> {
if (response.statusCode() != 200) {
throw new RuntimeException(
"HTTP " + response.statusCode());
}
return response.body();
})
.exceptionally(throwable -> {
if (throwable.getCause() instanceof HttpTimeoutException) {
return "Request timed out";
} else if (throwable.getCause() instanceof
java.net.ConnectException) {
return "Could not connect to server";
} else {
return "Error: " + throwable.getMessage();
}
});
}
}
Retry Patterns
Resilient Async Operations:
/**
* Retry strategies for async operations
*/
public class RetryStrategies {
/**
* Simple retry with exponential backoff
*/
public static CompletableFuture<String> retryWithBackoff(
String url, int maxRetries) {
return retryWithBackoff(url, maxRetries, 1,
java.time.Duration.ofSeconds(1));
}
/**
* Retry with configurable backoff
*/
public static CompletableFuture<String> retryWithBackoff(
String url, int maxRetries, int backoffMultiplier,
java.time.Duration initialDelay) {
HttpClient client = HttpClient.newHttpClient();
return retryInternal(client, url, maxRetries, 1,
backoffMultiplier, initialDelay.toMillis());
}
private static CompletableFuture<String> retryInternal(
HttpClient client, String url, int maxRetries,
int attempt, int backoffMultiplier, long delayMs) {
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(response -> {
if (response.statusCode() >= 200 &&
response.statusCode() < 300) {
return CompletableFuture.completedFuture(response.body());
} else if (response.statusCode() >= 500 && attempt < maxRetries) {
// Retry on server error
return delay(delayMs * backoffMultiplier)
.thenCompose(v -> retryInternal(
client, url, maxRetries, attempt + 1,
backoffMultiplier, delayMs));
} else {
return CompletableFuture.failedFuture(
new RuntimeException("HTTP " +
response.statusCode()));
}
})
.thenCompose(f -> f)
.exceptionally(throwable -> {
if (attempt < maxRetries) {
return delay(delayMs * backoffMultiplier)
.thenCompose(v -> retryInternal(
client, url, maxRetries, attempt + 1,
backoffMultiplier, delayMs))
.join();
} else {
throw new RuntimeException(
"All retries exhausted", throwable);
}
});
}
private static CompletableFuture<Void> delay(long milliseconds) {
return CompletableFuture.delayedExecutor(
milliseconds, java.util.concurrent.TimeUnit.MILLISECONDS)
.submit(() -> null);
}
/**
* Retry with circuit breaker
*/
public static class CircuitBreaker {
private final AtomicInteger failures = new AtomicInteger(0);
private final int threshold;
private final long resetTimeMs;
private long lastFailureTime = 0;
public CircuitBreaker(int failureThreshold,
long resetTimeMs) {
this.threshold = failureThreshold;
this.resetTimeMs = resetTimeMs;
}
public <T> CompletableFuture<T> execute(
CompletableFuture<T> operation) {
// Check if circuit is open
if (failures.get() >= threshold) {
long timeSinceLastFailure =
System.currentTimeMillis() - lastFailureTime;
if (timeSinceLastFailure < resetTimeMs) {
return CompletableFuture.failedFuture(
new RuntimeException("Circuit breaker is open"));
} else {
// Reset
failures.set(0);
}
}
return operation
.whenComplete((result, exception) -> {
if (exception != null) {
failures.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
} else {
failures.set(0);
}
});
}
}
}
Concurrent Requests
Handling Multiple Async Operations:
/**
* Concurrent async request patterns
*/
public class ConcurrentRequests {
/**
* Fetch all URLs, wait for all
*/
public static CompletableFuture<List<String>> fetchAll(
List<String> urls) {
List<CompletableFuture<String>> futures = urls.stream()
.map(ConcurrentRequests::fetchAsync)
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
/**
* Fetch all URLs, return first success
*/
public static CompletableFuture<String> fetchFirstSuccess(
List<String> urls) {
List<CompletableFuture<String>> futures = urls.stream()
.map(ConcurrentRequests::fetchAsync)
.toList();
return CompletableFuture.anyOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(result -> (String) result);
}
/**
* Rate-limited concurrent requests
*/
public static CompletableFuture<List<String>> fetchWithRateLimit(
List<String> urls, int concurrency) {
Semaphore semaphore = new Semaphore(concurrency);
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return fetchSync(url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}))
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
private static CompletableFuture<String> fetchAsync(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
private static String fetchSync(String url) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
return response.body();
}
}
Timeouts in Async
Async Timeout Handling:
/**
* Timeout patterns in async operations
*/
public class AsyncTimeouts {
/**
* Request timeout
*/
public static CompletableFuture<String> withRequestTimeout(
String url, int timeoutSeconds) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.timeout(java.time.Duration.ofSeconds(timeoutSeconds))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
/**
* Future timeout with orTimeout
*/
public static CompletableFuture<String> withOrTimeout(
String url, int timeoutSeconds) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.orTimeout(timeoutSeconds,
java.util.concurrent.TimeUnit.SECONDS);
}
/**
* Complete with default on timeout
*/
public static CompletableFuture<String> withCompleteOnTimeout(
String url, String defaultValue, int timeoutSeconds) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
return client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.completeOnTimeout(defaultValue,
timeoutSeconds,
java.util.concurrent.TimeUnit.SECONDS);
}
}
Cancellation
Cancelling Async Operations:
/**
* Cancellation patterns
*/
public class AsyncCancellation {
/**
* Cancel async operation
*/
public static void demonstrateCancellation() throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(
URI.create("https://slow-server.example.com/large-file"))
.GET()
.build();
CompletableFuture<String> future =
client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
// Start operation
Thread.sleep(100);
// Cancel it
boolean cancelled = future.cancel(true);
System.out.println("Cancelled: " + cancelled);
}
/**
* Cancel with cleanup
*/
public static void cancellationWithCleanup() throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(
URI.create("https://example.com/data"))
.GET()
.build();
CompletableFuture<String> future =
client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Operation cancelled or failed");
}
});
// Cancel with cleanup
future.cancel(true);
}
}
Best Practices
1. Use sendAsync for Non-Blocking I/O:
CompletableFuture<String> future = client.sendAsync(request,
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
2. Chain Operations with thenApply/thenCompose:
client.sendAsync(request1, ...)
.thenApply(HttpResponse::body)
.thenCompose(result -> client.sendAsync(request2, ...));
3. Handle Errors Properly:
client.sendAsync(request, ...)
.exceptionally(throwable -> "fallback value");
4. Implement Retries with Exponential Backoff:
// Use retry strategy for transient failures
retryWithBackoff(url, maxAttempts);
5. Apply Timeouts:
client.sendAsync(request, ...)
.orTimeout(10, TimeUnit.SECONDS);
These async patterns enable building responsive, non-blocking HTTP applications in Java.