10.2 Backpressure and Stream Operators
Backpressure is the mechanism that allows consumers to signal producers when they cannot keep up with the rate of data generation.
Understanding Backpressure
What is Backpressure?
// Producer generates data faster than consumer can handle
// Without backpressure - memory overflow, loss, or blocking
// With backpressure:
// Consumer signals: "Send me 10 items"
// Producer limits to 10 items
// Consumer processes and requests more
public class BackpressureDemo {
// Producer - generates 1000 items per second
Observable<Long> fastProducer = Observable.interval(1, TimeUnit.MILLISECONDS);
// Consumer - processes 100 items per second
Subscriber<Long> slowConsumer = new Subscriber<Long>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
// Request 10 items initially (backpressure signal)
s.request(10);
}
@Override
public void onNext(Long value) {
Thread.sleep(10); // Slow processing (100/sec)
processItem(value);
// Request one more for each processed
subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Stream complete");
}
};
public void demonstrateBackpressure() {
fastProducer.subscribe(slowConsumer);
// Producer waits for consumer to process before sending more
}
}
Two Approaches to Handling Fast Producers:
Backpressure-Aware (Flowable) - Producer respects consumer pace
// Flowable respects backpressure from subscriber Flowable<Long> backpressureAware = Flowable .interval(1, TimeUnit.MILLISECONDS) .take(10); backpressureAware.subscribe(new Subscriber<Long>() { @Override public void onSubscribe(Subscription s) { // Request items at controlled pace s.request(5); // Get 5 items } @Override public void onNext(Long value) { System.out.println("Item: " + value); // Could request more, or wait } @Override public void onError(Throwable e) {} @Override public void onComplete() {} });Non-Backpressure (Observable) - Producer doesn't wait
// Observable doesn't support backpressure natively // Must handle overflow explicitly Observable<Long> fastObservable = Observable .interval(1, TimeUnit.MILLISECONDS) .onBackpressureBuffer(1000); // Buffer overflow
Backpressure Strategies
1. Buffer - Accumulate Items:
Flowable<Long> data = Flowable.interval(1, TimeUnit.MILLISECONDS);
// Buffer with fixed size
data
.onBackpressureBuffer(100) // Buffer up to 100 items
.subscribe(item -> {
Thread.sleep(10); // Slow consumer
System.out.println(item);
});
// Buffer with custom action when full
data
.onBackpressureBuffer(
100,
() -> System.out.println("Buffer full"),
BackpressureOverflowStrategy.DROP_OLDEST
)
.subscribe(this::process);
2. Drop - Discard Items:
// Drop latest
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop() // Discard if consumer can't keep up
.subscribe(this::process);
// Drop with callback
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop(dropped ->
logger.warn("Dropped item: " + dropped)
)
.subscribe(this::process);
// Drop oldest
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(100, BackpressureOverflowStrategy.DROP_OLDEST)
.subscribe(this::process);
3. Latest - Keep Only Most Recent:
// Always emit the latest item, drop old ones
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.subscribe(item -> {
Thread.sleep(50); // Slow processing
System.out.println("Processing: " + item);
});
// Example: UI updates
// If 5 click events happen but UI only renders once per 100ms,
// show the latest click, not all intermediate ones
uiClickStream
.onBackpressureLatest()
.throttleLatest(100, TimeUnit.MILLISECONDS)
.subscribe(this::updateUI);
Stream Manipulation Operators
Grouping and Windowing:
// GroupBy - Partition stream by key
Flowable<String> events = /* stream of events */;
events
.groupBy(event -> event.userId()) // Group by user
.flatMap(group -> group
.scan(0, (count, event) -> count + 1) // Count per user
.map(count -> group.getKey() + ": " + count)
)
.subscribe(System.out::println);
// Buffer - Collect into batches
Flowable.range(1, 100)
.buffer(10) // Emit 10 items at a time
.subscribe(batch -> System.out.println("Batch: " + batch));
// Output:
// Batch: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Batch: [11, 12, ..., 20]
// ...
// Window - Time-based grouping
Flowable.interval(1, TimeUnit.MILLISECONDS)
.window(100, TimeUnit.MILLISECONDS) // 100ms windows
.flatMap(window -> window.count()) // Count per window
.subscribe(windowCount -> System.out.println("Items in window: " + windowCount));
Combining Streams:
// Merge - Combine multiple streams
Flowable<String> stream1 = Flowable.just("A", "B");
Flowable<String> stream2 = Flowable.just("C", "D");
Flowable.merge(stream1, stream2)
.subscribe(System.out::println);
// Output: A B C D (order may vary)
// Concat - Sequential combination
Flowable.concat(stream1, stream2)
.subscribe(System.out::println);
// Output: A B C D (in order)
// Zip - Combine corresponding items
Flowable<String> names = Flowable.just("Alice", "Bob");
Flowable<Integer> ages = Flowable.just(30, 25);
Flowable.zip(names, ages, (name, age) -> name + " is " + age)
.subscribe(System.out::println);
// Output:
// Alice is 30
// Bob is 25
// CombineLatest - Latest from each stream
Flowable<String> userStream = /* user updates */;
Flowable<String> sessionStream = /* session updates */;
Flowable.combineLatest(
userStream,
sessionStream,
(user, session) -> user + " in " + session
)
.subscribe(System.out::println);
Time-Based Operators:
// Delay - Postpone emission
Flowable.just("Item")
.delay(2, TimeUnit.SECONDS)
.subscribe(System.out::println);
// Timeout - Error if no item within timeframe
Flowable<String> stream = /* may be slow */;
stream
.timeout(5, TimeUnit.SECONDS)
.subscribe(
System.out::println,
error -> System.err.println("Timed out: " + error)
);
// Throttle - Limit emission rate
Flowable.interval(1, TimeUnit.MILLISECONDS)
.throttleFirst(100, TimeUnit.MILLISECONDS) // First of each 100ms
.subscribe(System.out::println);
// Debounce - Wait for pause before emitting
userInputStream
.debounce(300, TimeUnit.MILLISECONDS) // Wait 300ms after last input
.subscribe(this::searchWithTerm);
Stateful Operators:
// Scan - Running accumulation
Flowable.range(1, 5)
.scan(0, (acc, value) -> acc + value)
.subscribe(System.out::println);
// Output: 1 3 6 10 15
// Reduce - Single final accumulation
Flowable.range(1, 5)
.reduce(0, (acc, value) -> acc + value)
.subscribe(sum -> System.out.println("Sum: " + sum));
// Output: Sum: 15
// Distinct - Only unique items
Flowable.just(1, 2, 2, 3, 3, 3, 4)
.distinct()
.subscribe(System.out::println);
// Output: 1 2 3 4
// DistinctUntilChanged - Unique consecutive items
Flowable.just(1, 1, 2, 2, 2, 3, 1, 1)
.distinctUntilChanged()
.subscribe(System.out::println);
// Output: 1 2 3 1
Advanced Stream Processing
Real-World Example: Log File Processing with Backpressure:
public class LogFileProcessor {
public void processLogFile(Path logFile, int batchSize) throws Exception {
Flowable.fromIterable(Files.readAllLines(logFile))
.buffer(batchSize) // Process in batches
.flatMap(batch -> processBatch(batch)
.subscribeOn(Schedulers.io()) // Process on I/O thread
)
.observeOn(Schedulers.single()) // Write results serially
.subscribe(
result -> writeResult(result),
error -> logger.error("Processing failed", error),
() -> logger.info("Completed processing")
);
}
private Single<ProcessResult> processBatch(List<String> lines) {
return Single.create(emitter -> {
try {
List<LogEntry> entries = lines.stream()
.map(LogEntry::parse)
.collect(Collectors.toList());
ProcessResult result = new ProcessResult(
entries.size(),
entries.stream()
.filter(e -> e.isError())
.count()
);
emitter.onSuccess(result);
} catch (Exception e) {
emitter.onError(e);
}
});
}
private void writeResult(ProcessResult result) {
System.out.println("Processed " + result.totalLines +
" lines with " + result.errorCount + " errors");
}
record LogEntry(String timestamp, String level, String message) {
static LogEntry parse(String line) {
// Parse log line
return new LogEntry(null, null, line);
}
boolean isError() {
return level != null && level.contains("ERROR");
}
}
record ProcessResult(long totalLines, long errorCount) {}
}
Rate Limiting with Backpressure:
public class RateLimiter {
// Allow 100 requests per second
public Flowable<String> limitRate(Flowable<String> requests) {
return requests
.zipWith(
Flowable.interval(10, TimeUnit.MILLISECONDS),
(request, tick) -> request
);
}
// Usage
public void example() {
Flowable.range(1, 1000)
.map(n -> "Request-" + n)
.compose(this::limitRate)
.subscribe(request -> System.out.println("Processing: " + request));
}
}
Error Recovery with Backpressure:
public Flowable<String> resilientStream(
Callable<Flowable<String>> primarySource,
Callable<Flowable<String>> fallbackSource) {
return Flowable.defer(() -> primarySource.call())
.onErrorResumeNext(error -> {
logger.warn("Primary source failed, switching to fallback", error);
return Flowable.defer(() -> fallbackSource.call());
})
.retry(3); // Retry up to 3 times
}
// Usage
resilientStream(
() -> fetchFromPrimaryDatabase(),
() -> fetchFromCache()
)
.onBackpressureLatest()
.subscribe(
data -> updateUI(data),
error -> showError(error)
);
Choosing Between Observable and Flowable
Use Observable when:
- Data source is not back-pressurable (UI events, timers)
- You're wrapping callback APIs
- You don't need explicit backpressure handling
- Simpler code is more important than perfect backpressure
Use Flowable when:
- Dealing with potentially large data volumes
- Source can produce data faster than consumer
- You need explicit backpressure control
- Integrating with reactive databases (R2DBC) or messaging
// Observable - fire-and-forget
Observable<UIEvent> clicks = uiClickStream;
// Flowable - controlled consumption
Flowable<DatabaseRecord> records = database.query(sql)
.onBackpressureBuffer(1000)
.doOnNext(record -> logger.debug("Fetched: " + record));
Best Practices
1. Always Specify Backpressure Strategy:
// Good - explicit backpressure handling
observable
.toFlowable(BackpressureStrategy.LATEST)
.onBackpressureLatest()
.subscribe(this::handle);
// Avoid - default strategy may cause issues
observable.toFlowable().subscribe(this::handle);
2. Request Only What You Can Handle:
subscriber.onSubscribe(subscription -> {
// Start with small request
subscription.request(10);
});
subscriber.onNext(item -> {
processSlowly(item);
// Request one more
subscription.request(1);
});
3. Compose Operators Correctly:
// Good - maintain back pressure through pipeline
source
.buffer(100)
.map(batch -> processBatch(batch))
.onBackpressureBuffer(10) // Limit intermediate buffer
.subscribe(this::store);
// Avoid - losing backpressure
source
.toList() // Collects all - defeats backpressure
.subscribe(this::process);
4. Test Backpressure:
@Test
public void testBackpressureHandling() {
TestSubscriber<Integer> test = Flowable.range(1, 100)
.onBackpressureBuffer(10)
.test(10); // Request only 10 items
test.assertValueCount(10);
test.assertNotComplete(); // More items pending
}