10.4 Real-World Patterns and Applications
Production patterns for building scalable, responsive systems with reactive programming.
Pattern 1: User Interface with Reactive Streams
Problem: Handle rapid user input, maintain responsive UI, prevent memory leaks.
public class ReactiveUIController {
private final CompositeDisposable disposables = new CompositeDisposable();
private final PublishSubject<String> searchInput = PublishSubject.create();
private final PublishSubject<SortOrder> sortInput = PublishSubject.create();
private final PublishSubject<FilterOptions> filterInput = PublishSubject.create();
private final SearchService searchService;
private final UIEventBus eventBus;
public void initializeUI() {
// Debounce search input (wait for user to stop typing)
disposables.add(
searchInput
.debounce(300, TimeUnit.MILLISECONDS)
.filter(term -> term.length() > 2) // Minimum 3 characters
.distinctUntilChanged()
.flatMap(term -> performSearch(term)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.mainThread())
.onErrorResumeNext(error -> {
System.err.println("Search failed: " + error);
return Observable.just(Collections.emptyList());
})
)
.subscribe(
results -> updateSearchResults(results),
error -> showSearchError(error)
)
);
// Combine search, sort, and filter
disposables.add(
Observable.combineLatest(
searchInput.startWith(""),
sortInput.startWith(SortOrder.RELEVANCE),
filterInput.startWith(FilterOptions.ALL),
(search, sort, filter) -> new SearchQuery(search, sort, filter)
)
.debounce(500, TimeUnit.MILLISECONDS)
.flatMap(query -> performComplexSearch(query))
.subscribe(
results -> displayResults(results),
error -> System.err.println("Error: " + error)
)
);
// Reactive autocomplete
disposables.add(
searchInput
.debounce(200, TimeUnit.MILLISECONDS)
.filter(term -> term.length() > 0)
.flatMap(term -> fetchAutocompleteSuggestions(term)
.subscribeOn(Schedulers.io())
.timeout(2, TimeUnit.SECONDS)
.onErrorResumeNext(Observable.empty())
)
.observeOn(Schedulers.mainThread())
.subscribe(
suggestions -> showAutocompleteSuggestions(suggestions)
)
);
}
public void onUserInput(String text) {
searchInput.onNext(text);
}
public void onSortChange(SortOrder order) {
sortInput.onNext(order);
}
public void onFilterChange(FilterOptions filter) {
filterInput.onNext(filter);
}
public void cleanup() {
disposables.dispose();
}
private Observable<List<SearchResult>> performSearch(String term) {
return Observable.fromCallable(() ->
searchService.search(term)
);
}
private Observable<List<SearchResult>> performComplexSearch(SearchQuery query) {
return Observable.fromCallable(() ->
searchService.complexSearch(query)
);
}
private Observable<List<String>> fetchAutocompleteSuggestions(String prefix) {
return Observable.fromCallable(() ->
searchService.suggest(prefix)
);
}
private void updateSearchResults(List<SearchResult> results) {
System.out.println("Showing " + results.size() + " results");
}
private void displayResults(List<SearchResult> results) {
System.out.println("Displaying results");
}
private void showAutocompleteSuggestions(List<String> suggestions) {
System.out.println("Suggestions: " + suggestions);
}
private void showSearchError(Throwable error) {
System.err.println("Search error: " + error);
}
// Supporting types
record SearchResult(String id, String title, String summary) {}
record SearchQuery(String term, SortOrder sort, FilterOptions filter) {}
enum SortOrder { RELEVANCE, DATE, POPULARITY }
record FilterOptions(String category, String dateRange) {
static FilterOptions ALL = new FilterOptions(null, null);
}
}
Pattern 2: Data Stream Processing Pipeline
Problem: Process high-volume data stream with backpressure, batch processing, and failure recovery.
public class DataProcessingPipeline {
private final SourceConnector sourceConnector;
private final TransformationService transformationService;
private final ValidationService validationService;
private final DestinationConnector destinationConnector;
private final MetricsService metricsService;
public record DataRecord(String id, Map<String, Object> fields) {}
public record ProcessingMetrics(long processed, long errors, long skipped, long duration) {}
public Flowable<ProcessingMetrics> processPipeline(int batchSize) {
return sourceConnector.getDataStream()
// Step 1: Batch for efficiency
.buffer(batchSize)
// Step 2: Transform data
.flatMap(batch -> transformBatch(batch)
.subscribeOn(Schedulers.io())
)
// Step 3: Validate
.filter(record -> {
boolean valid = validationService.validate(record);
if (!valid) {
metricsService.incrementSkipped();
}
return valid;
})
// Step 4: Enrich with metadata
.map(this::enrichRecord)
// Step 5: Handle backpressure - drop oldest if buffer full
.onBackpressureBuffer(
1000, // Buffer size
() -> metricsService.incrementDropped(),
BackpressureOverflowStrategy.DROP_OLDEST
)
// Step 6: Persist with retry
.flatMap(record -> persistRecord(record)
.subscribeOn(Schedulers.io())
.retry(3)
.doOnError(error -> {
metricsService.recordError(error);
System.err.println("Failed to persist: " + error);
})
.onErrorResumeNext(Flowable.empty())
)
// Step 7: Emit metrics periodically
.buffer(10000, TimeUnit.MILLISECONDS)
.map(batch -> metricsService.getMetrics());
}
private Single<List<DataRecord>> transformBatch(List<DataRecord> batch) {
return Single.create(emitter -> {
try {
List<DataRecord> transformed = batch.stream()
.map(transformationService::transform)
.collect(Collectors.toList());
emitter.onSuccess(transformed);
} catch (Exception e) {
emitter.onError(e);
}
});
}
private DataRecord enrichRecord(DataRecord record) {
Map<String, Object> enriched = new HashMap<>(record.fields());
enriched.put("_processed_at", System.currentTimeMillis());
enriched.put("_version", "1.0");
return new DataRecord(record.id(), enriched);
}
private Single<DataRecord> persistRecord(DataRecord record) {
return Single.fromCallable(() -> {
destinationConnector.write(record);
metricsService.incrementProcessed();
return record;
});
}
}
Pattern 3: Resilient API Client
Problem: Handle timeouts, retries, rate limiting, and circuit breaking for external APIs.
public class ResilientAPIClient {
private final HttpClient httpClient;
private final RateLimiter rateLimiter;
private final CircuitBreaker circuitBreaker;
private final RetryStrategy retryStrategy;
private final Logger logger;
public record APIResponse(int statusCode, String body) {}
public Single<APIResponse> get(String url) {
return Single.create(emitter -> {
if (!circuitBreaker.isOpen()) {
performRequest(url, emitter);
} else {
emitter.onError(new CircuitBreakerOpenException(
"Circuit breaker open for: " + url
));
}
})
.subscribeOn(Schedulers.io());
}
private void performRequest(String url, SingleEmitter<APIResponse> emitter) {
rateLimiter.acquire()
.subscribe(
() -> {
try {
APIResponse response = executeRequest(url);
if (response.statusCode() >= 500) {
// Server error - eligible for circuit breaker
circuitBreaker.recordFailure();
emitter.onError(
new ServerErrorException(
"Server error: " + response.statusCode()
)
);
} else if (response.statusCode() >= 400) {
// Client error - don't retry
emitter.onError(
new ClientErrorException(
"Client error: " + response.statusCode()
)
);
} else {
// Success
circuitBreaker.recordSuccess();
emitter.onSuccess(response);
}
} catch (Exception e) {
circuitBreaker.recordFailure();
emitter.onError(e);
}
},
emitter::onError
);
}
private APIResponse executeRequest(String url) throws Exception {
// Actual HTTP request with timeout
return retryStrategy.execute(
() -> httpClient.get(url)
.timeout(5, TimeUnit.SECONDS)
);
}
public Single<APIResponse> getWithFallback(String url, String fallbackUrl) {
return get(url)
.onErrorResumeNext(error -> {
logger.warn("Primary API failed, using fallback: " + error.getMessage());
return get(fallbackUrl);
});
}
public static class CircuitBreaker {
private volatile int failureCount = 0;
private volatile boolean open = false;
private static final int FAILURE_THRESHOLD = 5;
private static final long OPEN_DURATION = 60000; // 1 minute
public void recordSuccess() {
failureCount = 0;
open = false;
}
public void recordFailure() {
failureCount++;
if (failureCount >= FAILURE_THRESHOLD) {
open = true;
// Schedule recovery attempt
scheduleHalfOpen();
}
}
public boolean isOpen() {
return open;
}
private void scheduleHalfOpen() {
Observable.timer(OPEN_DURATION, TimeUnit.MILLISECONDS)
.subscribe(ignore -> {
failureCount = 0;
open = false;
});
}
}
public static class RateLimiter {
private final long requestInterval; // Millis between requests
private volatile long lastRequestTime = 0;
public RateLimiter(int requestsPerSecond) {
this.requestInterval = 1000L / requestsPerSecond;
}
public Single<Void> acquire() {
return Single.create(emitter -> {
long elapsed = System.currentTimeMillis() - lastRequestTime;
long remaining = requestInterval - elapsed;
if (remaining > 0) {
Observable.timer(remaining, TimeUnit.MILLISECONDS)
.subscribe(
ignore -> {
lastRequestTime = System.currentTimeMillis();
emitter.onSuccess(null);
},
emitter::onError
);
} else {
lastRequestTime = System.currentTimeMillis();
emitter.onSuccess(null);
}
});
}
}
}
Pattern 4: Reactive Database Access
Problem: Non-blocking database queries with pooling and result streaming.
public class ReactiveDataRepository {
private final R2DBCPool connectionPool;
public record User(String id, String name, String email) {}
public record Page<T>(List<T> items, int total, int pageNumber, int pageSize) {}
// Single result
public Single<User> getUserById(String userId) {
return Mono.from(
connectionPool.getConnection()
.flatMapMany(connection ->
connection.createStatement(
"SELECT id, name, email FROM users WHERE id = $1"
)
.bind(0, userId)
.execute()
)
.flatMap(result -> result.map(row ->
new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
)
))
.take(1)
)
.toSingle();
}
// Stream results
public Flowable<User> getAllUsers() {
return Flowable.create(emitter ->
connectionPool.getConnection()
.flatMapMany(connection ->
connection.createStatement("SELECT id, name, email FROM users")
.execute()
)
.flatMap(result -> result.map(row ->
new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
)
))
.subscribe(
emitter::onNext,
emitter::onError,
emitter::onComplete
),
BackpressureStrategy.BUFFER
);
}
// Paginated results
public Single<Page<User>> getUsersPage(int pageNumber, int pageSize) {
int offset = (pageNumber - 1) * pageSize;
return Single.zip(
// Fetch page data
Flowable.create(emitter ->
connectionPool.getConnection()
.flatMapMany(connection ->
connection.createStatement(
"SELECT id, name, email FROM users " +
"LIMIT $1 OFFSET $2"
)
.bind(0, pageSize)
.bind(1, offset)
.execute()
)
.flatMap(result -> result.map(row ->
new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
)
))
.toList(),
BackpressureStrategy.BUFFER
)
.toSingle(),
// Fetch total count
Mono.from(
connectionPool.getConnection()
.flatMapMany(connection ->
connection.createStatement("SELECT COUNT(*) as cnt FROM users")
.execute()
)
.flatMap(result -> result.map(row -> row.get("cnt", Integer.class)))
.take(1)
)
.toSingle(),
// Combine results
(items, total) -> new Page<>(items, total, pageNumber, pageSize)
);
}
// Batch insert with transaction
public Completable insertUsers(List<User> users) {
return Completable.create(emitter ->
connectionPool.getConnection()
.flatMapMany(connection ->
connection.beginTransaction()
.then(Mono.fromIterable(users)
.flatMap(user ->
connection.createStatement(
"INSERT INTO users (id, name, email) VALUES ($1, $2, $3)"
)
.bind(0, user.id())
.bind(1, user.name())
.bind(2, user.email())
.execute()
)
.then(connection.commitTransaction())
)
.onErrorResume(e ->
connection.rollbackTransaction()
.then(Mono.error(e))
)
)
.subscribe(
ignore -> emitter.onComplete(),
emitter::onError
)
);
}
// Stream-based processing
public Completable processAllUsersInBatches(int batchSize,
Function<List<User>, Completable> processor) {
return getAllUsers()
.buffer(batchSize)
.flatMapCompletable(processor);
}
}
Pattern 5: Complex Event Processing
Problem: Aggregate events from multiple sources with time windows and correlation.
public class EventProcessor {
private final PublishSubject<Event> eventStream = PublishSubject.create();
public record Event(String type, String sourceId, long timestamp, Map<String, Object> data) {}
public record CorrelatedEvents(String correlationId, List<Event> events) {}
// Correlate events within time window
public Observable<CorrelatedEvents> correlateEventsWithinWindow(
String correlationKey,
long windowDuration,
TimeUnit unit) {
return eventStream
// Window events by time
.window(windowDuration, unit)
// Group by correlation key within each window
.flatMap(window -> window
.groupBy(event -> (String) event.data().get(correlationKey))
.flatMap(group -> group
.toList()
.map(events -> new CorrelatedEvents(group.getKey(), events))
)
);
}
// Detect event sequence pattern
public Observable<CorrelatedEvents> detectPattern(
String... expectedSequence) {
return eventStream
.scan(
new ArrayList<Event>(),
(sequence, event) -> {
sequence.add(event);
if (sequence.size() > expectedSequence.length) {
sequence.remove(0); // Keep window size
}
return sequence;
}
)
.filter(sequence ->
sequence.size() == expectedSequence.length &&
IntStream.range(0, sequence.size())
.allMatch(i -> sequence.get(i).type().equals(expectedSequence[i]))
)
.map(sequence -> new CorrelatedEvents(
UUID.randomUUID().toString(),
sequence
));
}
// Detect anomalies
public Observable<Event> detectAnomalies(
String eventType,
int expectedCount,
long windowDuration,
TimeUnit unit) {
return eventStream
.filter(event -> event.type().equals(eventType))
.window(windowDuration, unit)
.flatMap(window -> window
.count()
.filter(count -> count < expectedCount) // Anomaly: too few events
.flatMapObservable(ignored -> window)
);
}
// Rate-based metrics
public Observable<Long> calculateEventRate(String eventType, long periodMillis) {
return eventStream
.filter(event -> event.type().equals(eventType))
.buffer(periodMillis, TimeUnit.MILLISECONDS)
.map(List::size);
}
public void publishEvent(Event event) {
eventStream.onNext(event);
}
}
Best Practices for Production Systems
1. Monitor Resource Usage:
disposable = observable
.doOnNext(item -> {
if (item % 1000 == 0) {
System.out.println("Processed: " + item);
Runtime rt = Runtime.getRuntime();
System.out.println("Memory: " +
(rt.totalMemory() - rt.freeMemory()) / 1024 / 1024 + "MB");
}
})
.subscribe(this::process);
2. Proper Resource Cleanup:
public class ReactiveService implements AutoCloseable {
private final CompositeDisposable disposables = new CompositeDisposable();
public void start() {
disposables.add(observable.subscribe(...));
}
@Override
public void close() {
disposables.dispose();
System.out.println("Service cleaned up");
}
}
// Usage
try (var service = new ReactiveService()) {
service.start();
// Service running
} // Automatically cleaned up
3. Metrics and Monitoring:
observable
.doOnSubscribe(subscription -> metrics.incrementSubscription())
.doOnNext(item -> metrics.incrementItemProcessed())
.doOnError(error -> metrics.incrementError(error.getClass()))
.doOnComplete(() -> metrics.incrementCompletion())
.subscribe(this::process);
4. Graceful Degradation:
primaryData()
.onErrorResumeNext(secondaryData())
.onErrorResumeNext(cachedData())
.onErrorResumeNext(Single.just(DEFAULT_DATA))
.subscribe(this::use);
5. Test with TestScheduler for Time:
@Test
public void testWithTimeControl() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> delayed = Observable.interval(1, TimeUnit.SECONDS, scheduler)
.take(5);
TestObserver<Long> observer = delayed.test();
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
observer.assertValues(0, 1, 2, 3, 4);
observer.assertComplete();
}