10.3 Async Composition and Advanced Patterns
Building complex asynchronous workflows from simpler reactive primitives.
Single and Completable
Single - Emit One Item or Error:
// Single succeeds with one value or fails
Single<String> fetchUser = Single.create(emitter -> {
try {
String user = userService.getUser("123");
emitter.onSuccess(user);
} catch (Exception e) {
emitter.onError(e);
}
});
// Usage
fetchUser.subscribe(
user -> System.out.println("User: " + user),
error -> System.err.println("Error: " + error)
);
// From Callable
Single<String> fromCallable = Single.fromCallable(() ->
expensiveOperation()
);
// Converting Observable to Single
Observable<String> stream = /* ... */;
Single<String> first = stream.firstOrError();
Single<String> last = stream.lastOrError();
Completable - No Value, Just Completion or Error:
// Completable succeeds or fails with no value
Completable writeFile = Completable.create(emitter -> {
try {
Files.write(path, data);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
// Usage
writeFile.subscribe(
() -> System.out.println("Written successfully"),
error -> System.err.println("Write failed: " + error)
);
// From action
Completable fromAction = Completable.fromAction(() -> {
// Do something, throw on error
updateDatabase();
});
// Merge multiple completables
Completable.mergeArray(
writeUserFile(),
writeLogsFile(),
updateCache()
).subscribe(
() -> System.out.println("All done"),
error -> System.err.println("Some task failed: " + error)
);
Observable, Single, and Completable Hierarchy:
public class AsyncTypeComparison {
// Observable - 0, 1, or N items
Observable<String> stream = Observable.just("A", "B", "C");
// Single - exactly 1 item
Single<String> single = Single.just("Result");
// Completable - no items, just success/failure
Completable task = Completable.fromAction(() -> {
System.out.println("Task complete");
});
// Conversions
void conversions() {
// Observable to Single
Single<String> first = stream.firstOrError();
// Single to Observable
Observable<String> asObservable = single.toObservable();
// Single to Completable
Completable asCompletable = single.ignoreElement();
}
}
Sequential Composition
FlatMap - Dependent Operations:
// First fetch user, then their orders, then order details
Single<User> getUser(String userId) { /* ... */ }
Single<List<Order>> getUserOrders(String userId) { /* ... */ }
Single<OrderDetails> getOrderDetails(String orderId) { /* ... */ }
Single<OrderSummary> getCompleteOrderSummary(String userId) {
return getUser(userId)
.flatMap(user ->
getUserOrders(user.id())
.flatMap(orders -> {
if (orders.isEmpty()) {
return Single.just(new OrderSummary(user, Collections.emptyList()));
}
return Single.zip(
orders.stream()
.map(order -> getOrderDetails(order.id()))
.collect(Collectors.toList()),
results -> {
List<OrderDetails> details = Arrays.asList((OrderDetails[]) results);
return new OrderSummary(user, details);
}
);
})
);
}
// Usage
getCompleteOrderSummary("user123")
.subscribe(
summary -> System.out.println(summary),
error -> System.err.println("Error: " + error)
);
Concat - Sequential Execution:
// Execute operations in order, wait for each to complete
Completable.concat(
saveUserChanges(),
updateSearchIndex(),
syncWithRemoteServer()
)
.subscribe(
() -> System.out.println("All operations completed"),
error -> System.err.println("Failed: " + error)
);
// With Flowable
Flowable.concat(
fetchUsersFromCache(),
fetchUsersFromDatabase(),
fetchUsersFromRemoteAPI()
)
.firstOrError() // Get first non-empty result
.subscribe(this::displayUsers);
Ordered Execution with Error Handling:
public Single<DeploymentResult> deployApplication(Application app) {
return Single.create(emitter -> {
try {
System.out.println("1. Building...");
build(app);
System.out.println("2. Testing...");
runTests(app);
System.out.println("3. Deploying...");
deployToServer(app);
emitter.onSuccess(new DeploymentResult(true, "Deployment successful"));
} catch (BuildException e) {
System.err.println("Build failed at step 1");
emitter.onError(e);
} catch (TestException e) {
System.err.println("Tests failed at step 2");
emitter.onError(e);
} catch (DeployException e) {
System.err.println("Deployment failed at step 3");
// Rollback
rollbackDeployment(app);
emitter.onError(e);
}
});
}
// Reactive version with operators
public Single<DeploymentResult> deployApplicationReactive(Application app) {
return Single.fromCallable(() -> {
System.out.println("1. Building...");
return build(app);
})
.flatMap(built -> Single.fromCallable(() -> {
System.out.println("2. Testing...");
runTests(app);
return built;
}))
.flatMap(tested -> Single.fromCallable(() -> {
System.out.println("3. Deploying...");
return deployToServer(app);
}))
.map(deployed -> new DeploymentResult(true, "Deployment successful"))
.onErrorResumeNext(error -> {
System.err.println("Deployment failed: " + error.getMessage());
return Single.fromCallable(() -> {
rollbackDeployment(app);
return new DeploymentResult(false, error.getMessage());
});
});
}
Parallel Composition
Parallel - Multiple Concurrent Operations:
// Fetch multiple resources in parallel
Single<UserData> user = fetchUser("123");
Single<List<Order>> orders = fetchOrders("123");
Single<List<Notification>> notifications = fetchNotifications("123");
// Combine results
Single<Dashboard> dashboard = Single.zip(
user,
orders,
notifications,
(userData, orderList, notificationList) ->
new Dashboard(userData, orderList, notificationList)
);
dashboard.subscribe(
dash -> System.out.println(dash),
error -> System.err.println("Failed to load dashboard: " + error)
);
// With many sources (up to 9)
Single<ReportData> report = Single.zip(
getSalesData(),
getMarketingData(),
getFinanceData(),
getHRData(),
getITData(),
getOperationsData(),
getCustomerData(),
getProductData(),
getCompetitorData(),
(sales, marketing, finance, hr, it, ops, cust, prod, comp) ->
new ReportData(sales, marketing, finance, hr, it, ops, cust, prod, comp)
);
Parallel Flowable Processing:
public void processLargeDatasetInParallel(List<String> items) {
Flowable.fromIterable(items)
.parallel(4) // 4 parallel workers
.runOn(Schedulers.io())
.map(item -> processItem(item))
.sequential() // Combine back to Flowable
.subscribe(
result -> System.out.println(result),
error -> System.err.println("Error: " + error)
);
}
// Parallel with custom concurrency
Flowable.fromIterable(items)
.parallel(
Runtime.getRuntime().availableProcessors() // Use all CPUs
)
.runOn(Schedulers.computation())
.map(this::cpuIntensiveWork)
.sequential()
.subscribe(this::handleResult);
CombineLatest - React to Latest Values:
// Update UI whenever user data or session changes
Single<User> userStream = /* ... */;
Single<Session> sessionStream = /* ... */;
Single.zip(
userStream,
sessionStream,
(user, session) -> new UserSession(user, session)
)
.subscribe(this::updateUI);
// With Flowable (for streams)
Flowable<String> searchTerms = userInputStream;
Flowable<FilterOptions> filterOptions = optionsStream;
Flowable.combineLatest(
searchTerms,
filterOptions,
(term, filters) -> new SearchQuery(term, filters)
)
.debounce(300, TimeUnit.MILLISECONDS)
.flatMap(query -> performSearch(query)
.subscribeOn(Schedulers.io())
)
.subscribe(results -> displayResults(results));
Advanced Error Recovery
Retry Strategies:
// Retry with exponential backoff
Single<String> resilientFetch = fetchData()
.retryWhen(errors -> errors
.zipWith(
Flowable.range(1, 3),
(error, retryCount) -> retryCount
)
.flatMap(retryCount ->
Flowable.timer(
(long) Math.pow(2, retryCount) * 100, // 200ms, 400ms, 800ms
TimeUnit.MILLISECONDS
)
)
);
// Retry with custom condition
Single<String> conditionalRetry = fetchData()
.retryWhen(errors -> errors
.takeWhile(error -> error instanceof TransientException)
.flatMap(error -> {
System.out.println("Transient error, retrying: " + error.getMessage());
return Flowable.timer(1, TimeUnit.SECONDS);
})
);
// Simple retry count
fetchData()
.retry(3) // Retry up to 3 times
.subscribe(
this::handleSuccess,
error -> System.err.println("Failed after retries: " + error)
);
Fallback and Recovery:
// Try primary, fallback to secondary
Single<String> primarySource = fetchFromPrimaryAPI();
Single<String> fallbackSource = fetchFromCacheAPI();
Single<String> resilient = primarySource
.onErrorResumeNext(error -> {
System.out.println("Primary failed: " + error.getMessage());
return fallbackSource;
});
// Chain multiple fallbacks
Single<String> multiLevelFallback = primarySource
.onErrorResumeNext(error1 -> secondarySource
.onErrorResumeNext(error2 -> tertiarySource
.onErrorResumeNext(error3 -> Single.error(
new AggregateException("All sources failed",
error1, error2, error3)
))
)
);
// Custom error handling
Single<String> customErrorHandler = fetchData()
.onErrorResumeNext(error -> {
if (error instanceof NetworkException) {
System.out.println("Network error, using cache");
return getCachedData();
} else if (error instanceof ValidationException) {
System.out.println("Validation error, using default");
return Single.just(DEFAULT_VALUE);
} else {
return Single.error(error);
}
});
Resource Management:
// Using try-finally pattern
Single<Result> managed = Single.using(
this::openResource, // Create resource
resource -> performWork(resource), // Use resource
resource -> resource.close() // Clean up
);
// Example with database
Single<List<User>> queryDatabase = Single.using(
() -> dataSource.getConnection(),
connection -> Single.fromCallable(() -> {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM users");
List<User> users = new ArrayList<>();
while (rs.next()) {
users.add(User.fromResultSet(rs));
}
return users;
}),
Connection::close
);
Real-World Example: Request Processing Pipeline
public class RequestProcessor {
private final AuthService authService;
private final ValidatorService validatorService;
private final BusinessLogicService businessService;
private final PersistenceService persistenceService;
private final NotificationService notificationService;
public record Request(String userId, String action, Map<String, Object> payload) {}
public record ProcessingResult(boolean success, String message, Object data) {}
// Complete request processing pipeline
public Single<ProcessingResult> processRequest(Request request) {
return Single.just(request)
// Step 1: Authenticate
.flatMap(req -> {
System.out.println("1. Authenticating user: " + req.userId());
return authService.authenticate(req.userId())
.map(token -> new AuthContext(req, token));
})
// Step 2: Validate
.flatMap(ctx -> {
System.out.println("2. Validating request");
return validatorService.validate(ctx.request.action(), ctx.request.payload())
.map(validated -> new ValidatedContext(ctx.request, ctx.token, validated));
})
// Step 3: Execute business logic
.flatMap(ctx -> {
System.out.println("3. Processing business logic");
return businessService.execute(
ctx.request.action(),
ctx.request.payload()
)
.subscribeOn(Schedulers.io())
.map(result -> new ExecutionContext(ctx.request, ctx.token, result));
})
// Step 4: Persist result
.flatMap(ctx -> {
System.out.println("4. Persisting result");
return persistenceService.save(ctx.result)
.map(saved -> ctx);
})
// Step 5: Send notification (best-effort)
.flatMap(ctx -> {
System.out.println("5. Sending notification");
return notificationService.notify(ctx.request.userId(), ctx.result)
.onErrorResumeNext(Single.just(null)) // Ignore notification errors
.map(ignored -> ctx);
})
// Final result
.map(ctx -> new ProcessingResult(
true,
"Request processed successfully",
ctx.result
))
// Error handling
.onErrorResumeNext(error -> {
System.err.println("Processing failed: " + error.getMessage());
if (error instanceof AuthenticationException) {
return Single.just(new ProcessingResult(
false,
"Authentication failed",
null
));
} else if (error instanceof ValidationException) {
return Single.just(new ProcessingResult(
false,
"Validation failed: " + error.getMessage(),
null
));
} else {
return Single.just(new ProcessingResult(
false,
"Unexpected error: " + error.getMessage(),
null
));
}
});
}
// Context classes for passing data through pipeline
record AuthContext(Request request, String token) {}
record ValidatedContext(Request request, String token, boolean valid) {}
record ExecutionContext(Request request, String token, Object result) {}
}
Testing Reactive Code
TestObserver and TestSubscriber:
@Test
public void testObservableTransformation() {
TestObserver<Integer> test = Observable.range(1, 10)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.test();
test.assertValues(4, 8, 12, 16, 20);
test.assertComplete();
test.assertNoErrors();
}
@Test
public void testFlowableBackpressure() {
TestSubscriber<Integer> test = Flowable.range(1, 100)
.onBackpressureBuffer(10)
.test(10); // Request only 10 items
test.assertValueCount(10);
test.assertNotComplete();
test.request(20);
test.assertValueCount(30);
}
@Test
public void testSingleWithError() {
TestObserver<String> test = Single.error(
new IllegalArgumentException("Invalid input")
)
.onErrorResumeNext(Single.just("Default"))
.test();
test.assertValues("Default");
test.assertComplete();
}
@Test
public void testCompletableSequence() {
TestObserver<Void> test = Completable.concat(
Completable.complete(),
Completable.complete(),
Completable.error(new RuntimeException("Step 3 failed"))
)
.test();
test.assertNotComplete();
test.assertError(RuntimeException.class);
test.assertErrorMessage("Step 3 failed");
}
Best Practices
1. Use Appropriate Type:
// Use Observable for streams
Observable<UserAction> userActions = ...;
// Use Single for one-off operations
Single<User> fetchUser = ...;
// Use Completable for fire-and-forget
Completable saveChanges = ...;
2. Avoid Nested Subscriptions:
// Bad - nested subscribe
observable.subscribe(item -> {
anotherObservable.subscribe(value -> {
// Callback hell
doSomething(item, value);
});
});
// Good - use flatMap/flatMapSingle
observable.flatMap(item ->
anotherObservable.map(value -> new Pair(item, value))
)
.subscribe(pair -> doSomething(pair.item, pair.value));
3. Handle Errors Appropriately:
// Always provide error handler
observable.subscribe(
this::onNext,
this::onError, // Don't forget this!
this::onComplete
);
4. Use Correct Schedulers:
// I/O operations
apiCall()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.mainThread())
.subscribe();
// CPU-intensive
computation()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.mainThread())
.subscribe();