10.2 Backpressure and Stream Operators

Backpressure is the mechanism that allows consumers to signal producers when they cannot keep up with the rate of data generation.

Understanding Backpressure

What is Backpressure?

// Producer generates data faster than consumer can handle
// Without backpressure - memory overflow, loss, or blocking

// With backpressure:
// Consumer signals: "Send me 10 items"
// Producer limits to 10 items
// Consumer processes and requests more

public class BackpressureDemo {
    // Producer - generates 1000 items per second
    Observable<Long> fastProducer = Observable.interval(1, TimeUnit.MILLISECONDS);

    // Consumer - processes 100 items per second
    Subscriber<Long> slowConsumer = new Subscriber<Long>() {
        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            // Request 10 items initially (backpressure signal)
            s.request(10);
        }

        @Override
        public void onNext(Long value) {
            Thread.sleep(10);  // Slow processing (100/sec)
            processItem(value);
            // Request one more for each processed
            subscription.request(1);
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("Error: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Stream complete");
        }
    };

    public void demonstrateBackpressure() {
        fastProducer.subscribe(slowConsumer);
        // Producer waits for consumer to process before sending more
    }
}

Two Approaches to Handling Fast Producers:

  1. Backpressure-Aware (Flowable) - Producer respects consumer pace

    // Flowable respects backpressure from subscriber
    Flowable<Long> backpressureAware = Flowable
        .interval(1, TimeUnit.MILLISECONDS)
        .take(10);
    
    backpressureAware.subscribe(new Subscriber<Long>() {
        @Override
        public void onSubscribe(Subscription s) {
            // Request items at controlled pace
            s.request(5);  // Get 5 items
        }
    
        @Override
        public void onNext(Long value) {
            System.out.println("Item: " + value);
            // Could request more, or wait
        }
    
        @Override
        public void onError(Throwable e) {}
    
        @Override
        public void onComplete() {}
    });
    
  2. Non-Backpressure (Observable) - Producer doesn't wait

    // Observable doesn't support backpressure natively
    // Must handle overflow explicitly
    Observable<Long> fastObservable = Observable
        .interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureBuffer(1000);  // Buffer overflow
    

Backpressure Strategies

1. Buffer - Accumulate Items:

Flowable<Long> data = Flowable.interval(1, TimeUnit.MILLISECONDS);

// Buffer with fixed size
data
    .onBackpressureBuffer(100)  // Buffer up to 100 items
    .subscribe(item -> {
        Thread.sleep(10);  // Slow consumer
        System.out.println(item);
    });

// Buffer with custom action when full
data
    .onBackpressureBuffer(
        100,
        () -> System.out.println("Buffer full"),
        BackpressureOverflowStrategy.DROP_OLDEST
    )
    .subscribe(this::process);

2. Drop - Discard Items:

// Drop latest
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop()  // Discard if consumer can't keep up
    .subscribe(this::process);

// Drop with callback
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop(dropped -> 
        logger.warn("Dropped item: " + dropped)
    )
    .subscribe(this::process);

// Drop oldest
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(100, BackpressureOverflowStrategy.DROP_OLDEST)
    .subscribe(this::process);

3. Latest - Keep Only Most Recent:

// Always emit the latest item, drop old ones
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureLatest()
    .subscribe(item -> {
        Thread.sleep(50);  // Slow processing
        System.out.println("Processing: " + item);
    });

// Example: UI updates
// If 5 click events happen but UI only renders once per 100ms,
// show the latest click, not all intermediate ones
uiClickStream
    .onBackpressureLatest()
    .throttleLatest(100, TimeUnit.MILLISECONDS)
    .subscribe(this::updateUI);

Stream Manipulation Operators

Grouping and Windowing:

// GroupBy - Partition stream by key
Flowable<String> events = /* stream of events */;

events
    .groupBy(event -> event.userId())  // Group by user
    .flatMap(group -> group
        .scan(0, (count, event) -> count + 1)  // Count per user
        .map(count -> group.getKey() + ": " + count)
    )
    .subscribe(System.out::println);

// Buffer - Collect into batches
Flowable.range(1, 100)
    .buffer(10)  // Emit 10 items at a time
    .subscribe(batch -> System.out.println("Batch: " + batch));
// Output:
// Batch: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Batch: [11, 12, ..., 20]
// ...

// Window - Time-based grouping
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .window(100, TimeUnit.MILLISECONDS)  // 100ms windows
    .flatMap(window -> window.count())   // Count per window
    .subscribe(windowCount -> System.out.println("Items in window: " + windowCount));

Combining Streams:

// Merge - Combine multiple streams
Flowable<String> stream1 = Flowable.just("A", "B");
Flowable<String> stream2 = Flowable.just("C", "D");

Flowable.merge(stream1, stream2)
    .subscribe(System.out::println);
// Output: A B C D (order may vary)

// Concat - Sequential combination
Flowable.concat(stream1, stream2)
    .subscribe(System.out::println);
// Output: A B C D (in order)

// Zip - Combine corresponding items
Flowable<String> names = Flowable.just("Alice", "Bob");
Flowable<Integer> ages = Flowable.just(30, 25);

Flowable.zip(names, ages, (name, age) -> name + " is " + age)
    .subscribe(System.out::println);
// Output:
// Alice is 30
// Bob is 25

// CombineLatest - Latest from each stream
Flowable<String> userStream = /* user updates */;
Flowable<String> sessionStream = /* session updates */;

Flowable.combineLatest(
    userStream,
    sessionStream,
    (user, session) -> user + " in " + session
)
    .subscribe(System.out::println);

Time-Based Operators:

// Delay - Postpone emission
Flowable.just("Item")
    .delay(2, TimeUnit.SECONDS)
    .subscribe(System.out::println);

// Timeout - Error if no item within timeframe
Flowable<String> stream = /* may be slow */;
stream
    .timeout(5, TimeUnit.SECONDS)
    .subscribe(
        System.out::println,
        error -> System.err.println("Timed out: " + error)
    );

// Throttle - Limit emission rate
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .throttleFirst(100, TimeUnit.MILLISECONDS)  // First of each 100ms
    .subscribe(System.out::println);

// Debounce - Wait for pause before emitting
userInputStream
    .debounce(300, TimeUnit.MILLISECONDS)  // Wait 300ms after last input
    .subscribe(this::searchWithTerm);

Stateful Operators:

// Scan - Running accumulation
Flowable.range(1, 5)
    .scan(0, (acc, value) -> acc + value)
    .subscribe(System.out::println);
// Output: 1 3 6 10 15

// Reduce - Single final accumulation
Flowable.range(1, 5)
    .reduce(0, (acc, value) -> acc + value)
    .subscribe(sum -> System.out.println("Sum: " + sum));
// Output: Sum: 15

// Distinct - Only unique items
Flowable.just(1, 2, 2, 3, 3, 3, 4)
    .distinct()
    .subscribe(System.out::println);
// Output: 1 2 3 4

// DistinctUntilChanged - Unique consecutive items
Flowable.just(1, 1, 2, 2, 2, 3, 1, 1)
    .distinctUntilChanged()
    .subscribe(System.out::println);
// Output: 1 2 3 1

Advanced Stream Processing

Real-World Example: Log File Processing with Backpressure:

public class LogFileProcessor {
    public void processLogFile(Path logFile, int batchSize) throws Exception {
        Flowable.fromIterable(Files.readAllLines(logFile))
            .buffer(batchSize)  // Process in batches
            .flatMap(batch -> processBatch(batch)
                .subscribeOn(Schedulers.io())  // Process on I/O thread
            )
            .observeOn(Schedulers.single())  // Write results serially
            .subscribe(
                result -> writeResult(result),
                error -> logger.error("Processing failed", error),
                () -> logger.info("Completed processing")
            );
    }

    private Single<ProcessResult> processBatch(List<String> lines) {
        return Single.create(emitter -> {
            try {
                List<LogEntry> entries = lines.stream()
                    .map(LogEntry::parse)
                    .collect(Collectors.toList());

                ProcessResult result = new ProcessResult(
                    entries.size(),
                    entries.stream()
                        .filter(e -> e.isError())
                        .count()
                );

                emitter.onSuccess(result);
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }

    private void writeResult(ProcessResult result) {
        System.out.println("Processed " + result.totalLines + 
            " lines with " + result.errorCount + " errors");
    }

    record LogEntry(String timestamp, String level, String message) {
        static LogEntry parse(String line) {
            // Parse log line
            return new LogEntry(null, null, line);
        }
        boolean isError() {
            return level != null && level.contains("ERROR");
        }
    }

    record ProcessResult(long totalLines, long errorCount) {}
}

Rate Limiting with Backpressure:

public class RateLimiter {
    // Allow 100 requests per second
    public Flowable<String> limitRate(Flowable<String> requests) {
        return requests
            .zipWith(
                Flowable.interval(10, TimeUnit.MILLISECONDS),
                (request, tick) -> request
            );
    }

    // Usage
    public void example() {
        Flowable.range(1, 1000)
            .map(n -> "Request-" + n)
            .compose(this::limitRate)
            .subscribe(request -> System.out.println("Processing: " + request));
    }
}

Error Recovery with Backpressure:

public Flowable<String> resilientStream(
        Callable<Flowable<String>> primarySource,
        Callable<Flowable<String>> fallbackSource) {

    return Flowable.defer(() -> primarySource.call())
        .onErrorResumeNext(error -> {
            logger.warn("Primary source failed, switching to fallback", error);
            return Flowable.defer(() -> fallbackSource.call());
        })
        .retry(3);  // Retry up to 3 times
}

// Usage
resilientStream(
    () -> fetchFromPrimaryDatabase(),
    () -> fetchFromCache()
)
    .onBackpressureLatest()
    .subscribe(
        data -> updateUI(data),
        error -> showError(error)
    );

Choosing Between Observable and Flowable

Use Observable when:

  • Data source is not back-pressurable (UI events, timers)
  • You're wrapping callback APIs
  • You don't need explicit backpressure handling
  • Simpler code is more important than perfect backpressure

Use Flowable when:

  • Dealing with potentially large data volumes
  • Source can produce data faster than consumer
  • You need explicit backpressure control
  • Integrating with reactive databases (R2DBC) or messaging
// Observable - fire-and-forget
Observable<UIEvent> clicks = uiClickStream;

// Flowable - controlled consumption
Flowable<DatabaseRecord> records = database.query(sql)
    .onBackpressureBuffer(1000)
    .doOnNext(record -> logger.debug("Fetched: " + record));

Best Practices

1. Always Specify Backpressure Strategy:

// Good - explicit backpressure handling
observable
    .toFlowable(BackpressureStrategy.LATEST)
    .onBackpressureLatest()
    .subscribe(this::handle);

// Avoid - default strategy may cause issues
observable.toFlowable().subscribe(this::handle);

2. Request Only What You Can Handle:

subscriber.onSubscribe(subscription -> {
    // Start with small request
    subscription.request(10);
});

subscriber.onNext(item -> {
    processSlowly(item);
    // Request one more
    subscription.request(1);
});

3. Compose Operators Correctly:

// Good - maintain back pressure through pipeline
source
    .buffer(100)
    .map(batch -> processBatch(batch))
    .onBackpressureBuffer(10)  // Limit intermediate buffer
    .subscribe(this::store);

// Avoid - losing backpressure
source
    .toList()  // Collects all - defeats backpressure
    .subscribe(this::process);

4. Test Backpressure:

@Test
public void testBackpressureHandling() {
    TestSubscriber<Integer> test = Flowable.range(1, 100)
        .onBackpressureBuffer(10)
        .test(10);  // Request only 10 items

    test.assertValueCount(10);
    test.assertNotComplete();  // More items pending
}