10.3 Async Composition and Advanced Patterns

Building complex asynchronous workflows from simpler reactive primitives.

Single and Completable

Single - Emit One Item or Error:

// Single succeeds with one value or fails
Single<String> fetchUser = Single.create(emitter -> {
    try {
        String user = userService.getUser("123");
        emitter.onSuccess(user);
    } catch (Exception e) {
        emitter.onError(e);
    }
});

// Usage
fetchUser.subscribe(
    user -> System.out.println("User: " + user),
    error -> System.err.println("Error: " + error)
);

// From Callable
Single<String> fromCallable = Single.fromCallable(() ->
    expensiveOperation()
);

// Converting Observable to Single
Observable<String> stream = /* ... */;
Single<String> first = stream.firstOrError();
Single<String> last = stream.lastOrError();

Completable - No Value, Just Completion or Error:

// Completable succeeds or fails with no value
Completable writeFile = Completable.create(emitter -> {
    try {
        Files.write(path, data);
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
});

// Usage
writeFile.subscribe(
    () -> System.out.println("Written successfully"),
    error -> System.err.println("Write failed: " + error)
);

// From action
Completable fromAction = Completable.fromAction(() -> {
    // Do something, throw on error
    updateDatabase();
});

// Merge multiple completables
Completable.mergeArray(
    writeUserFile(),
    writeLogsFile(),
    updateCache()
).subscribe(
    () -> System.out.println("All done"),
    error -> System.err.println("Some task failed: " + error)
);

Observable, Single, and Completable Hierarchy:

public class AsyncTypeComparison {
    // Observable - 0, 1, or N items
    Observable<String> stream = Observable.just("A", "B", "C");

    // Single - exactly 1 item
    Single<String> single = Single.just("Result");

    // Completable - no items, just success/failure
    Completable task = Completable.fromAction(() -> {
        System.out.println("Task complete");
    });

    // Conversions
    void conversions() {
        // Observable to Single
        Single<String> first = stream.firstOrError();

        // Single to Observable
        Observable<String> asObservable = single.toObservable();

        // Single to Completable
        Completable asCompletable = single.ignoreElement();
    }
}

Sequential Composition

FlatMap - Dependent Operations:

// First fetch user, then their orders, then order details
Single<User> getUser(String userId) { /* ... */ }
Single<List<Order>> getUserOrders(String userId) { /* ... */ }
Single<OrderDetails> getOrderDetails(String orderId) { /* ... */ }

Single<OrderSummary> getCompleteOrderSummary(String userId) {
    return getUser(userId)
        .flatMap(user -> 
            getUserOrders(user.id())
                .flatMap(orders -> {
                    if (orders.isEmpty()) {
                        return Single.just(new OrderSummary(user, Collections.emptyList()));
                    }

                    return Single.zip(
                        orders.stream()
                            .map(order -> getOrderDetails(order.id()))
                            .collect(Collectors.toList()),
                        results -> {
                            List<OrderDetails> details = Arrays.asList((OrderDetails[]) results);
                            return new OrderSummary(user, details);
                        }
                    );
                })
        );
}

// Usage
getCompleteOrderSummary("user123")
    .subscribe(
        summary -> System.out.println(summary),
        error -> System.err.println("Error: " + error)
    );

Concat - Sequential Execution:

// Execute operations in order, wait for each to complete
Completable.concat(
    saveUserChanges(),
    updateSearchIndex(),
    syncWithRemoteServer()
)
    .subscribe(
        () -> System.out.println("All operations completed"),
        error -> System.err.println("Failed: " + error)
    );

// With Flowable
Flowable.concat(
    fetchUsersFromCache(),
    fetchUsersFromDatabase(),
    fetchUsersFromRemoteAPI()
)
    .firstOrError()  // Get first non-empty result
    .subscribe(this::displayUsers);

Ordered Execution with Error Handling:

public Single<DeploymentResult> deployApplication(Application app) {
    return Single.create(emitter -> {
        try {
            System.out.println("1. Building...");
            build(app);

            System.out.println("2. Testing...");
            runTests(app);

            System.out.println("3. Deploying...");
            deployToServer(app);

            emitter.onSuccess(new DeploymentResult(true, "Deployment successful"));

        } catch (BuildException e) {
            System.err.println("Build failed at step 1");
            emitter.onError(e);
        } catch (TestException e) {
            System.err.println("Tests failed at step 2");
            emitter.onError(e);
        } catch (DeployException e) {
            System.err.println("Deployment failed at step 3");
            // Rollback
            rollbackDeployment(app);
            emitter.onError(e);
        }
    });
}

// Reactive version with operators
public Single<DeploymentResult> deployApplicationReactive(Application app) {
    return Single.fromCallable(() -> {
        System.out.println("1. Building...");
        return build(app);
    })
        .flatMap(built -> Single.fromCallable(() -> {
            System.out.println("2. Testing...");
            runTests(app);
            return built;
        }))
        .flatMap(tested -> Single.fromCallable(() -> {
            System.out.println("3. Deploying...");
            return deployToServer(app);
        }))
        .map(deployed -> new DeploymentResult(true, "Deployment successful"))
        .onErrorResumeNext(error -> {
            System.err.println("Deployment failed: " + error.getMessage());
            return Single.fromCallable(() -> {
                rollbackDeployment(app);
                return new DeploymentResult(false, error.getMessage());
            });
        });
}

Parallel Composition

Parallel - Multiple Concurrent Operations:

// Fetch multiple resources in parallel
Single<UserData> user = fetchUser("123");
Single<List<Order>> orders = fetchOrders("123");
Single<List<Notification>> notifications = fetchNotifications("123");

// Combine results
Single<Dashboard> dashboard = Single.zip(
    user,
    orders,
    notifications,
    (userData, orderList, notificationList) -> 
        new Dashboard(userData, orderList, notificationList)
);

dashboard.subscribe(
    dash -> System.out.println(dash),
    error -> System.err.println("Failed to load dashboard: " + error)
);

// With many sources (up to 9)
Single<ReportData> report = Single.zip(
    getSalesData(),
    getMarketingData(),
    getFinanceData(),
    getHRData(),
    getITData(),
    getOperationsData(),
    getCustomerData(),
    getProductData(),
    getCompetitorData(),
    (sales, marketing, finance, hr, it, ops, cust, prod, comp) ->
        new ReportData(sales, marketing, finance, hr, it, ops, cust, prod, comp)
);

Parallel Flowable Processing:

public void processLargeDatasetInParallel(List<String> items) {
    Flowable.fromIterable(items)
        .parallel(4)  // 4 parallel workers
        .runOn(Schedulers.io())
        .map(item -> processItem(item))
        .sequential()  // Combine back to Flowable
        .subscribe(
            result -> System.out.println(result),
            error -> System.err.println("Error: " + error)
        );
}

// Parallel with custom concurrency
Flowable.fromIterable(items)
    .parallel(
        Runtime.getRuntime().availableProcessors()  // Use all CPUs
    )
    .runOn(Schedulers.computation())
    .map(this::cpuIntensiveWork)
    .sequential()
    .subscribe(this::handleResult);

CombineLatest - React to Latest Values:

// Update UI whenever user data or session changes
Single<User> userStream = /* ... */;
Single<Session> sessionStream = /* ... */;

Single.zip(
    userStream,
    sessionStream,
    (user, session) -> new UserSession(user, session)
)
    .subscribe(this::updateUI);

// With Flowable (for streams)
Flowable<String> searchTerms = userInputStream;
Flowable<FilterOptions> filterOptions = optionsStream;

Flowable.combineLatest(
    searchTerms,
    filterOptions,
    (term, filters) -> new SearchQuery(term, filters)
)
    .debounce(300, TimeUnit.MILLISECONDS)
    .flatMap(query -> performSearch(query)
        .subscribeOn(Schedulers.io())
    )
    .subscribe(results -> displayResults(results));

Advanced Error Recovery

Retry Strategies:

// Retry with exponential backoff
Single<String> resilientFetch = fetchData()
    .retryWhen(errors -> errors
        .zipWith(
            Flowable.range(1, 3),
            (error, retryCount) -> retryCount
        )
        .flatMap(retryCount -> 
            Flowable.timer(
                (long) Math.pow(2, retryCount) * 100,  // 200ms, 400ms, 800ms
                TimeUnit.MILLISECONDS
            )
        )
    );

// Retry with custom condition
Single<String> conditionalRetry = fetchData()
    .retryWhen(errors -> errors
        .takeWhile(error -> error instanceof TransientException)
        .flatMap(error -> {
            System.out.println("Transient error, retrying: " + error.getMessage());
            return Flowable.timer(1, TimeUnit.SECONDS);
        })
    );

// Simple retry count
fetchData()
    .retry(3)  // Retry up to 3 times
    .subscribe(
        this::handleSuccess,
        error -> System.err.println("Failed after retries: " + error)
    );

Fallback and Recovery:

// Try primary, fallback to secondary
Single<String> primarySource = fetchFromPrimaryAPI();
Single<String> fallbackSource = fetchFromCacheAPI();

Single<String> resilient = primarySource
    .onErrorResumeNext(error -> {
        System.out.println("Primary failed: " + error.getMessage());
        return fallbackSource;
    });

// Chain multiple fallbacks
Single<String> multiLevelFallback = primarySource
    .onErrorResumeNext(error1 -> secondarySource
        .onErrorResumeNext(error2 -> tertiarySource
            .onErrorResumeNext(error3 -> Single.error(
                new AggregateException("All sources failed", 
                    error1, error2, error3)
            ))
        )
    );

// Custom error handling
Single<String> customErrorHandler = fetchData()
    .onErrorResumeNext(error -> {
        if (error instanceof NetworkException) {
            System.out.println("Network error, using cache");
            return getCachedData();
        } else if (error instanceof ValidationException) {
            System.out.println("Validation error, using default");
            return Single.just(DEFAULT_VALUE);
        } else {
            return Single.error(error);
        }
    });

Resource Management:

// Using try-finally pattern
Single<Result> managed = Single.using(
    this::openResource,      // Create resource
    resource -> performWork(resource),  // Use resource
    resource -> resource.close()  // Clean up
);

// Example with database
Single<List<User>> queryDatabase = Single.using(
    () -> dataSource.getConnection(),
    connection -> Single.fromCallable(() -> {
        Statement stmt = connection.createStatement();
        ResultSet rs = stmt.executeQuery("SELECT * FROM users");
        List<User> users = new ArrayList<>();
        while (rs.next()) {
            users.add(User.fromResultSet(rs));
        }
        return users;
    }),
    Connection::close
);

Real-World Example: Request Processing Pipeline

public class RequestProcessor {
    private final AuthService authService;
    private final ValidatorService validatorService;
    private final BusinessLogicService businessService;
    private final PersistenceService persistenceService;
    private final NotificationService notificationService;

    public record Request(String userId, String action, Map<String, Object> payload) {}
    public record ProcessingResult(boolean success, String message, Object data) {}

    // Complete request processing pipeline
    public Single<ProcessingResult> processRequest(Request request) {
        return Single.just(request)
            // Step 1: Authenticate
            .flatMap(req -> {
                System.out.println("1. Authenticating user: " + req.userId());
                return authService.authenticate(req.userId())
                    .map(token -> new AuthContext(req, token));
            })
            // Step 2: Validate
            .flatMap(ctx -> {
                System.out.println("2. Validating request");
                return validatorService.validate(ctx.request.action(), ctx.request.payload())
                    .map(validated -> new ValidatedContext(ctx.request, ctx.token, validated));
            })
            // Step 3: Execute business logic
            .flatMap(ctx -> {
                System.out.println("3. Processing business logic");
                return businessService.execute(
                    ctx.request.action(),
                    ctx.request.payload()
                )
                    .subscribeOn(Schedulers.io())
                    .map(result -> new ExecutionContext(ctx.request, ctx.token, result));
            })
            // Step 4: Persist result
            .flatMap(ctx -> {
                System.out.println("4. Persisting result");
                return persistenceService.save(ctx.result)
                    .map(saved -> ctx);
            })
            // Step 5: Send notification (best-effort)
            .flatMap(ctx -> {
                System.out.println("5. Sending notification");
                return notificationService.notify(ctx.request.userId(), ctx.result)
                    .onErrorResumeNext(Single.just(null))  // Ignore notification errors
                    .map(ignored -> ctx);
            })
            // Final result
            .map(ctx -> new ProcessingResult(
                true,
                "Request processed successfully",
                ctx.result
            ))
            // Error handling
            .onErrorResumeNext(error -> {
                System.err.println("Processing failed: " + error.getMessage());

                if (error instanceof AuthenticationException) {
                    return Single.just(new ProcessingResult(
                        false,
                        "Authentication failed",
                        null
                    ));
                } else if (error instanceof ValidationException) {
                    return Single.just(new ProcessingResult(
                        false,
                        "Validation failed: " + error.getMessage(),
                        null
                    ));
                } else {
                    return Single.just(new ProcessingResult(
                        false,
                        "Unexpected error: " + error.getMessage(),
                        null
                    ));
                }
            });
    }

    // Context classes for passing data through pipeline
    record AuthContext(Request request, String token) {}
    record ValidatedContext(Request request, String token, boolean valid) {}
    record ExecutionContext(Request request, String token, Object result) {}
}

Testing Reactive Code

TestObserver and TestSubscriber:

@Test
public void testObservableTransformation() {
    TestObserver<Integer> test = Observable.range(1, 10)
        .filter(n -> n % 2 == 0)
        .map(n -> n * 2)
        .test();

    test.assertValues(4, 8, 12, 16, 20);
    test.assertComplete();
    test.assertNoErrors();
}

@Test
public void testFlowableBackpressure() {
    TestSubscriber<Integer> test = Flowable.range(1, 100)
        .onBackpressureBuffer(10)
        .test(10);  // Request only 10 items

    test.assertValueCount(10);
    test.assertNotComplete();

    test.request(20);
    test.assertValueCount(30);
}

@Test
public void testSingleWithError() {
    TestObserver<String> test = Single.error(
        new IllegalArgumentException("Invalid input")
    )
        .onErrorResumeNext(Single.just("Default"))
        .test();

    test.assertValues("Default");
    test.assertComplete();
}

@Test
public void testCompletableSequence() {
    TestObserver<Void> test = Completable.concat(
        Completable.complete(),
        Completable.complete(),
        Completable.error(new RuntimeException("Step 3 failed"))
    )
        .test();

    test.assertNotComplete();
    test.assertError(RuntimeException.class);
    test.assertErrorMessage("Step 3 failed");
}

Best Practices

1. Use Appropriate Type:

// Use Observable for streams
Observable<UserAction> userActions = ...;

// Use Single for one-off operations
Single<User> fetchUser = ...;

// Use Completable for fire-and-forget
Completable saveChanges = ...;

2. Avoid Nested Subscriptions:

// Bad - nested subscribe
observable.subscribe(item -> {
    anotherObservable.subscribe(value -> {
        // Callback hell
        doSomething(item, value);
    });
});

// Good - use flatMap/flatMapSingle
observable.flatMap(item ->
    anotherObservable.map(value -> new Pair(item, value))
)
    .subscribe(pair -> doSomething(pair.item, pair.value));

3. Handle Errors Appropriately:

// Always provide error handler
observable.subscribe(
    this::onNext,
    this::onError,  // Don't forget this!
    this::onComplete
);

4. Use Correct Schedulers:

// I/O operations
apiCall()
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.mainThread())
    .subscribe();

// CPU-intensive
computation()
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.mainThread())
    .subscribe();