10.4 Real-World Patterns and Applications

Production patterns for building scalable, responsive systems with reactive programming.

Pattern 1: User Interface with Reactive Streams

Problem: Handle rapid user input, maintain responsive UI, prevent memory leaks.

public class ReactiveUIController {
    private final CompositeDisposable disposables = new CompositeDisposable();

    private final PublishSubject<String> searchInput = PublishSubject.create();
    private final PublishSubject<SortOrder> sortInput = PublishSubject.create();
    private final PublishSubject<FilterOptions> filterInput = PublishSubject.create();

    private final SearchService searchService;
    private final UIEventBus eventBus;

    public void initializeUI() {
        // Debounce search input (wait for user to stop typing)
        disposables.add(
            searchInput
                .debounce(300, TimeUnit.MILLISECONDS)
                .filter(term -> term.length() > 2)  // Minimum 3 characters
                .distinctUntilChanged()
                .flatMap(term -> performSearch(term)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.mainThread())
                    .onErrorResumeNext(error -> {
                        System.err.println("Search failed: " + error);
                        return Observable.just(Collections.emptyList());
                    })
                )
                .subscribe(
                    results -> updateSearchResults(results),
                    error -> showSearchError(error)
                )
        );

        // Combine search, sort, and filter
        disposables.add(
            Observable.combineLatest(
                searchInput.startWith(""),
                sortInput.startWith(SortOrder.RELEVANCE),
                filterInput.startWith(FilterOptions.ALL),
                (search, sort, filter) -> new SearchQuery(search, sort, filter)
            )
                .debounce(500, TimeUnit.MILLISECONDS)
                .flatMap(query -> performComplexSearch(query))
                .subscribe(
                    results -> displayResults(results),
                    error -> System.err.println("Error: " + error)
                )
        );

        // Reactive autocomplete
        disposables.add(
            searchInput
                .debounce(200, TimeUnit.MILLISECONDS)
                .filter(term -> term.length() > 0)
                .flatMap(term -> fetchAutocompleteSuggestions(term)
                    .subscribeOn(Schedulers.io())
                    .timeout(2, TimeUnit.SECONDS)
                    .onErrorResumeNext(Observable.empty())
                )
                .observeOn(Schedulers.mainThread())
                .subscribe(
                    suggestions -> showAutocompleteSuggestions(suggestions)
                )
        );
    }

    public void onUserInput(String text) {
        searchInput.onNext(text);
    }

    public void onSortChange(SortOrder order) {
        sortInput.onNext(order);
    }

    public void onFilterChange(FilterOptions filter) {
        filterInput.onNext(filter);
    }

    public void cleanup() {
        disposables.dispose();
    }

    private Observable<List<SearchResult>> performSearch(String term) {
        return Observable.fromCallable(() -> 
            searchService.search(term)
        );
    }

    private Observable<List<SearchResult>> performComplexSearch(SearchQuery query) {
        return Observable.fromCallable(() ->
            searchService.complexSearch(query)
        );
    }

    private Observable<List<String>> fetchAutocompleteSuggestions(String prefix) {
        return Observable.fromCallable(() ->
            searchService.suggest(prefix)
        );
    }

    private void updateSearchResults(List<SearchResult> results) {
        System.out.println("Showing " + results.size() + " results");
    }

    private void displayResults(List<SearchResult> results) {
        System.out.println("Displaying results");
    }

    private void showAutocompleteSuggestions(List<String> suggestions) {
        System.out.println("Suggestions: " + suggestions);
    }

    private void showSearchError(Throwable error) {
        System.err.println("Search error: " + error);
    }

    // Supporting types
    record SearchResult(String id, String title, String summary) {}
    record SearchQuery(String term, SortOrder sort, FilterOptions filter) {}
    enum SortOrder { RELEVANCE, DATE, POPULARITY }
    record FilterOptions(String category, String dateRange) {
        static FilterOptions ALL = new FilterOptions(null, null);
    }
}

Pattern 2: Data Stream Processing Pipeline

Problem: Process high-volume data stream with backpressure, batch processing, and failure recovery.

public class DataProcessingPipeline {
    private final SourceConnector sourceConnector;
    private final TransformationService transformationService;
    private final ValidationService validationService;
    private final DestinationConnector destinationConnector;
    private final MetricsService metricsService;

    public record DataRecord(String id, Map<String, Object> fields) {}
    public record ProcessingMetrics(long processed, long errors, long skipped, long duration) {}

    public Flowable<ProcessingMetrics> processPipeline(int batchSize) {
        return sourceConnector.getDataStream()
            // Step 1: Batch for efficiency
            .buffer(batchSize)
            // Step 2: Transform data
            .flatMap(batch -> transformBatch(batch)
                .subscribeOn(Schedulers.io())
            )
            // Step 3: Validate
            .filter(record -> {
                boolean valid = validationService.validate(record);
                if (!valid) {
                    metricsService.incrementSkipped();
                }
                return valid;
            })
            // Step 4: Enrich with metadata
            .map(this::enrichRecord)
            // Step 5: Handle backpressure - drop oldest if buffer full
            .onBackpressureBuffer(
                1000,  // Buffer size
                () -> metricsService.incrementDropped(),
                BackpressureOverflowStrategy.DROP_OLDEST
            )
            // Step 6: Persist with retry
            .flatMap(record -> persistRecord(record)
                .subscribeOn(Schedulers.io())
                .retry(3)
                .doOnError(error -> {
                    metricsService.recordError(error);
                    System.err.println("Failed to persist: " + error);
                })
                .onErrorResumeNext(Flowable.empty())
            )
            // Step 7: Emit metrics periodically
            .buffer(10000, TimeUnit.MILLISECONDS)
            .map(batch -> metricsService.getMetrics());
    }

    private Single<List<DataRecord>> transformBatch(List<DataRecord> batch) {
        return Single.create(emitter -> {
            try {
                List<DataRecord> transformed = batch.stream()
                    .map(transformationService::transform)
                    .collect(Collectors.toList());

                emitter.onSuccess(transformed);

            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }

    private DataRecord enrichRecord(DataRecord record) {
        Map<String, Object> enriched = new HashMap<>(record.fields());
        enriched.put("_processed_at", System.currentTimeMillis());
        enriched.put("_version", "1.0");
        return new DataRecord(record.id(), enriched);
    }

    private Single<DataRecord> persistRecord(DataRecord record) {
        return Single.fromCallable(() -> {
            destinationConnector.write(record);
            metricsService.incrementProcessed();
            return record;
        });
    }
}

Pattern 3: Resilient API Client

Problem: Handle timeouts, retries, rate limiting, and circuit breaking for external APIs.

public class ResilientAPIClient {
    private final HttpClient httpClient;
    private final RateLimiter rateLimiter;
    private final CircuitBreaker circuitBreaker;
    private final RetryStrategy retryStrategy;
    private final Logger logger;

    public record APIResponse(int statusCode, String body) {}

    public Single<APIResponse> get(String url) {
        return Single.create(emitter -> {
            if (!circuitBreaker.isOpen()) {
                performRequest(url, emitter);
            } else {
                emitter.onError(new CircuitBreakerOpenException(
                    "Circuit breaker open for: " + url
                ));
            }
        })
            .subscribeOn(Schedulers.io());
    }

    private void performRequest(String url, SingleEmitter<APIResponse> emitter) {
        rateLimiter.acquire()
            .subscribe(
                () -> {
                    try {
                        APIResponse response = executeRequest(url);

                        if (response.statusCode() >= 500) {
                            // Server error - eligible for circuit breaker
                            circuitBreaker.recordFailure();
                            emitter.onError(
                                new ServerErrorException(
                                    "Server error: " + response.statusCode()
                                )
                            );
                        } else if (response.statusCode() >= 400) {
                            // Client error - don't retry
                            emitter.onError(
                                new ClientErrorException(
                                    "Client error: " + response.statusCode()
                                )
                            );
                        } else {
                            // Success
                            circuitBreaker.recordSuccess();
                            emitter.onSuccess(response);
                        }
                    } catch (Exception e) {
                        circuitBreaker.recordFailure();
                        emitter.onError(e);
                    }
                },
                emitter::onError
            );
    }

    private APIResponse executeRequest(String url) throws Exception {
        // Actual HTTP request with timeout
        return retryStrategy.execute(
            () -> httpClient.get(url)
                .timeout(5, TimeUnit.SECONDS)
        );
    }

    public Single<APIResponse> getWithFallback(String url, String fallbackUrl) {
        return get(url)
            .onErrorResumeNext(error -> {
                logger.warn("Primary API failed, using fallback: " + error.getMessage());
                return get(fallbackUrl);
            });
    }

    public static class CircuitBreaker {
        private volatile int failureCount = 0;
        private volatile boolean open = false;
        private static final int FAILURE_THRESHOLD = 5;
        private static final long OPEN_DURATION = 60000;  // 1 minute

        public void recordSuccess() {
            failureCount = 0;
            open = false;
        }

        public void recordFailure() {
            failureCount++;
            if (failureCount >= FAILURE_THRESHOLD) {
                open = true;
                // Schedule recovery attempt
                scheduleHalfOpen();
            }
        }

        public boolean isOpen() {
            return open;
        }

        private void scheduleHalfOpen() {
            Observable.timer(OPEN_DURATION, TimeUnit.MILLISECONDS)
                .subscribe(ignore -> {
                    failureCount = 0;
                    open = false;
                });
        }
    }

    public static class RateLimiter {
        private final long requestInterval;  // Millis between requests
        private volatile long lastRequestTime = 0;

        public RateLimiter(int requestsPerSecond) {
            this.requestInterval = 1000L / requestsPerSecond;
        }

        public Single<Void> acquire() {
            return Single.create(emitter -> {
                long elapsed = System.currentTimeMillis() - lastRequestTime;
                long remaining = requestInterval - elapsed;

                if (remaining > 0) {
                    Observable.timer(remaining, TimeUnit.MILLISECONDS)
                        .subscribe(
                            ignore -> {
                                lastRequestTime = System.currentTimeMillis();
                                emitter.onSuccess(null);
                            },
                            emitter::onError
                        );
                } else {
                    lastRequestTime = System.currentTimeMillis();
                    emitter.onSuccess(null);
                }
            });
        }
    }
}

Pattern 4: Reactive Database Access

Problem: Non-blocking database queries with pooling and result streaming.

public class ReactiveDataRepository {
    private final R2DBCPool connectionPool;

    public record User(String id, String name, String email) {}
    public record Page<T>(List<T> items, int total, int pageNumber, int pageSize) {}

    // Single result
    public Single<User> getUserById(String userId) {
        return Mono.from(
            connectionPool.getConnection()
                .flatMapMany(connection ->
                    connection.createStatement(
                        "SELECT id, name, email FROM users WHERE id = $1"
                    )
                        .bind(0, userId)
                        .execute()
                )
                .flatMap(result -> result.map(row ->
                    new User(
                        row.get("id", String.class),
                        row.get("name", String.class),
                        row.get("email", String.class)
                    )
                ))
                .take(1)
        )
            .toSingle();
    }

    // Stream results
    public Flowable<User> getAllUsers() {
        return Flowable.create(emitter ->
            connectionPool.getConnection()
                .flatMapMany(connection ->
                    connection.createStatement("SELECT id, name, email FROM users")
                        .execute()
                )
                .flatMap(result -> result.map(row ->
                    new User(
                        row.get("id", String.class),
                        row.get("name", String.class),
                        row.get("email", String.class)
                    )
                ))
                .subscribe(
                    emitter::onNext,
                    emitter::onError,
                    emitter::onComplete
                ),
            BackpressureStrategy.BUFFER
        );
    }

    // Paginated results
    public Single<Page<User>> getUsersPage(int pageNumber, int pageSize) {
        int offset = (pageNumber - 1) * pageSize;

        return Single.zip(
            // Fetch page data
            Flowable.create(emitter ->
                connectionPool.getConnection()
                    .flatMapMany(connection ->
                        connection.createStatement(
                            "SELECT id, name, email FROM users " +
                            "LIMIT $1 OFFSET $2"
                        )
                            .bind(0, pageSize)
                            .bind(1, offset)
                            .execute()
                    )
                    .flatMap(result -> result.map(row ->
                        new User(
                            row.get("id", String.class),
                            row.get("name", String.class),
                            row.get("email", String.class)
                        )
                    ))
                    .toList(),
                BackpressureStrategy.BUFFER
            )
                .toSingle(),

            // Fetch total count
            Mono.from(
                connectionPool.getConnection()
                    .flatMapMany(connection ->
                        connection.createStatement("SELECT COUNT(*) as cnt FROM users")
                            .execute()
                    )
                    .flatMap(result -> result.map(row -> row.get("cnt", Integer.class)))
                    .take(1)
            )
                .toSingle(),

            // Combine results
            (items, total) -> new Page<>(items, total, pageNumber, pageSize)
        );
    }

    // Batch insert with transaction
    public Completable insertUsers(List<User> users) {
        return Completable.create(emitter ->
            connectionPool.getConnection()
                .flatMapMany(connection ->
                    connection.beginTransaction()
                        .then(Mono.fromIterable(users)
                            .flatMap(user ->
                                connection.createStatement(
                                    "INSERT INTO users (id, name, email) VALUES ($1, $2, $3)"
                                )
                                    .bind(0, user.id())
                                    .bind(1, user.name())
                                    .bind(2, user.email())
                                    .execute()
                            )
                            .then(connection.commitTransaction())
                        )
                        .onErrorResume(e ->
                            connection.rollbackTransaction()
                                .then(Mono.error(e))
                        )
                )
                .subscribe(
                    ignore -> emitter.onComplete(),
                    emitter::onError
                )
        );
    }

    // Stream-based processing
    public Completable processAllUsersInBatches(int batchSize, 
                                               Function<List<User>, Completable> processor) {
        return getAllUsers()
            .buffer(batchSize)
            .flatMapCompletable(processor);
    }
}

Pattern 5: Complex Event Processing

Problem: Aggregate events from multiple sources with time windows and correlation.

public class EventProcessor {
    private final PublishSubject<Event> eventStream = PublishSubject.create();

    public record Event(String type, String sourceId, long timestamp, Map<String, Object> data) {}
    public record CorrelatedEvents(String correlationId, List<Event> events) {}

    // Correlate events within time window
    public Observable<CorrelatedEvents> correlateEventsWithinWindow(
            String correlationKey,
            long windowDuration,
            TimeUnit unit) {

        return eventStream
            // Window events by time
            .window(windowDuration, unit)
            // Group by correlation key within each window
            .flatMap(window -> window
                .groupBy(event -> (String) event.data().get(correlationKey))
                .flatMap(group -> group
                    .toList()
                    .map(events -> new CorrelatedEvents(group.getKey(), events))
                )
            );
    }

    // Detect event sequence pattern
    public Observable<CorrelatedEvents> detectPattern(
            String... expectedSequence) {

        return eventStream
            .scan(
                new ArrayList<Event>(),
                (sequence, event) -> {
                    sequence.add(event);
                    if (sequence.size() > expectedSequence.length) {
                        sequence.remove(0);  // Keep window size
                    }
                    return sequence;
                }
            )
            .filter(sequence ->
                sequence.size() == expectedSequence.length &&
                IntStream.range(0, sequence.size())
                    .allMatch(i -> sequence.get(i).type().equals(expectedSequence[i]))
            )
            .map(sequence -> new CorrelatedEvents(
                UUID.randomUUID().toString(),
                sequence
            ));
    }

    // Detect anomalies
    public Observable<Event> detectAnomalies(
            String eventType,
            int expectedCount,
            long windowDuration,
            TimeUnit unit) {

        return eventStream
            .filter(event -> event.type().equals(eventType))
            .window(windowDuration, unit)
            .flatMap(window -> window
                .count()
                .filter(count -> count < expectedCount)  // Anomaly: too few events
                .flatMapObservable(ignored -> window)
            );
    }

    // Rate-based metrics
    public Observable<Long> calculateEventRate(String eventType, long periodMillis) {
        return eventStream
            .filter(event -> event.type().equals(eventType))
            .buffer(periodMillis, TimeUnit.MILLISECONDS)
            .map(List::size);
    }

    public void publishEvent(Event event) {
        eventStream.onNext(event);
    }
}

Best Practices for Production Systems

1. Monitor Resource Usage:

disposable = observable
    .doOnNext(item -> {
        if (item % 1000 == 0) {
            System.out.println("Processed: " + item);
            Runtime rt = Runtime.getRuntime();
            System.out.println("Memory: " + 
                (rt.totalMemory() - rt.freeMemory()) / 1024 / 1024 + "MB");
        }
    })
    .subscribe(this::process);

2. Proper Resource Cleanup:

public class ReactiveService implements AutoCloseable {
    private final CompositeDisposable disposables = new CompositeDisposable();

    public void start() {
        disposables.add(observable.subscribe(...));
    }

    @Override
    public void close() {
        disposables.dispose();
        System.out.println("Service cleaned up");
    }
}

// Usage
try (var service = new ReactiveService()) {
    service.start();
    // Service running
} // Automatically cleaned up

3. Metrics and Monitoring:

observable
    .doOnSubscribe(subscription -> metrics.incrementSubscription())
    .doOnNext(item -> metrics.incrementItemProcessed())
    .doOnError(error -> metrics.incrementError(error.getClass()))
    .doOnComplete(() -> metrics.incrementCompletion())
    .subscribe(this::process);

4. Graceful Degradation:

primaryData()
    .onErrorResumeNext(secondaryData())
    .onErrorResumeNext(cachedData())
    .onErrorResumeNext(Single.just(DEFAULT_DATA))
    .subscribe(this::use);

5. Test with TestScheduler for Time:

@Test
public void testWithTimeControl() {
    TestScheduler scheduler = new TestScheduler();

    Observable<Long> delayed = Observable.interval(1, TimeUnit.SECONDS, scheduler)
        .take(5);

    TestObserver<Long> observer = delayed.test();

    scheduler.advanceTimeBy(5, TimeUnit.SECONDS);

    observer.assertValues(0, 1, 2, 3, 4);
    observer.assertComplete();
}