10.1 Reactive Programming Fundamentals
Reactive programming provides a declarative model for composing asynchronous, non-blocking event streams with explicit backpressure handling.
What is Reactive Programming?
The Reactive Paradigm:
// Traditional imperative - pull model
List<String> names = new ArrayList<>();
for (String line : Files.readAllLines(path)) {
if (line.contains("@")) {
names.add(line.substring(0, line.indexOf("@")));
}
}
System.out.println(names);
// Reactive - push model with backpressure
// Data pushed to subscribers at their pace
// Implicit or explicit backpressure
Key Characteristics:
- Asynchronous: Operations don't block waiting threads
- Non-blocking: Threads never wait for I/O completion
- Event-driven: Data flows through transformation pipeline
- Backpressure-aware: Consumers control consumption rate
- Composable: Easy to combine and transform streams
Comparison with Other Models:
public class ConcurrencyModelComparison {
// Blocking/Synchronous - threads block on I/O
public String blockingFetch() throws Exception {
String data = httpClient.get("https://api.example.com/data"); // Blocks!
System.out.println(data);
return data;
}
// Virtual Threads - still blocking but cheap threads
public void virtualThreadApproach() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.execute(() -> {
try {
String data = httpClient.get("https://api.example.com/data");
System.out.println(data);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
// Reactive - non-blocking, callbacks on completion
public void reactiveApproach() {
httpClientReactive.get("https://api.example.com/data")
.subscribe(
data -> System.out.println(data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
// Doesn't block - returns immediately
}
}
The Reactive Manifesto
Four Core Principles:
Responsive - React quickly to user inputs and events
// Responsive reactive system Observable<UserAction> userActions = /* stream of clicks, input, etc */; userActions .debounce(300, TimeUnit.MILLISECONDS) // React after 300ms pause .subscribe(action -> handleUserInput(action));Resilient - Stay responsive under failure
// Resilience with fallback primaryDataStream .onErrorResumeNext(fallbackDataStream) .subscribe(data -> updateUI(data));Elastic - Scale with load using backpressure
// Automatic backpressure handling fastDataProducer .onBackpressureBuffer(1000) // Buffer up to 1000 items .subscribe(subscriber);Message-driven - Async communication with explicit backpressure
// Message passing with backpressure publisher.subscribe(new Subscriber<T>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; s.request(10); // Request 10 items (backpressure signal) } @Override public void onNext(T item) { processItem(item); subscription.request(1); // Request next item } });
Understanding Observable and Subscriber
Observable - The Data Source:
// Observable emits data to subscribers
// Can emit 0, 1, or many items
// Signals completion or error
Observable<String> dataStream = Observable.create(emitter -> {
try {
emitter.onNext("Item 1");
emitter.onNext("Item 2");
emitter.onNext("Item 3");
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
Subscriber - The Observer:
// Subscriber receives items from observable
// Handles next, error, and complete signals
dataStream.subscribe(
item -> System.out.println("Got: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream complete") // onComplete
);
// Or using Observer interface
dataStream.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(String value) {
System.out.println("Received: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Stream ended");
}
});
Creating Observables
From Values:
// Single value
Observable<String> single = Observable.just("Hello");
// Multiple values
Observable<Integer> multiple = Observable.fromIterable(
Arrays.asList(1, 2, 3, 4, 5)
);
// Array
Observable<String> fromArray = Observable.fromArray("a", "b", "c");
// Callable/Supplier
Observable<String> deferred = Observable.fromCallable(() ->
expensiveOperation() // Called lazily when subscribed
);
From Events and Callbacks:
public class CallbackAdapter {
// Wrap callback-based API in Observable
public Observable<String> wrapCallback() {
return Observable.create(emitter -> {
legacyApi.fetchData(new DataCallback() {
@Override
public void onSuccess(String data) {
emitter.onNext(data);
emitter.onComplete();
}
@Override
public void onFailure(Exception error) {
emitter.onError(error);
}
});
});
}
}
Lazy Evaluation:
// Observable doesn't do work until subscribed
Observable<String> lazy = Observable.create(emitter -> {
System.out.println("Doing expensive work..."); // Only when subscribed
emitter.onNext("Result");
emitter.onComplete();
});
// Nothing printed yet - not subscribed
System.out.println("Created observable");
// Now it works
lazy.subscribe(result -> System.out.println(result));
// Output:
// Created observable
// Doing expensive work...
// Result
Transforming Data
Map - Transform Each Item:
Observable<Integer> numbers = Observable.fromIterable(
Arrays.asList(1, 2, 3, 4, 5)
);
numbers
.map(n -> n * n) // Square each number
.subscribe(result -> System.out.println(result));
// Output: 1 4 9 16 25
FlatMap - Async Transformation:
Observable<String> userIds = Observable.just("user1", "user2", "user3");
userIds
.flatMap(userId -> fetchUserProfile(userId)) // Returns Observable
.subscribe(
profile -> System.out.println("Got profile: " + profile.name()),
error -> System.err.println("Error: " + error)
);
// fetchUserProfile returns Observable<UserProfile>
private Observable<UserProfile> fetchUserProfile(String userId) {
return Observable.create(emitter -> {
// Async fetch
userService.getProfile(userId, profile -> {
emitter.onNext(profile);
emitter.onComplete();
});
});
}
Filter - Select Items:
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6))
.filter(n -> n % 2 == 0) // Only even numbers
.subscribe(result -> System.out.println(result));
// Output: 2 4 6
Scan - Running Accumulation:
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.scan(0, (acc, value) -> acc + value) // Running sum
.subscribe(result -> System.out.println(result));
// Output: 1 3 6 10 15
Error Handling
Basic Error Propagation:
Observable<String> errorStream = Observable.create(emitter -> {
emitter.onNext("Item 1");
emitter.onError(new RuntimeException("Something went wrong"));
});
errorStream.subscribe(
item -> System.out.println("Got: " + item),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Complete")
);
// Output:
// Got: Item 1
// Error: Something went wrong
Recovery Strategies:
// OnErrorReturn - return default on error
Observable<String> withDefault = errorStream
.onErrorReturn(error -> "Default value");
// OnErrorResumeNext - switch to another stream
Observable<String> withFallback = errorStream
.onErrorResumeNext(fallbackStream);
// Retry - resubscribe on error
Observable<String> withRetry = errorStream
.retry(3); // Retry up to 3 times
// RetryWhen - retry with custom logic
Observable<String> retryWithBackoff = errorStream
.retryWhen(errors -> errors
.zipWith(Observable.range(1, 3), (error, index) -> index)
.flatMap(retryCount -> Observable.timer(
100 * retryCount, // Exponential backoff
TimeUnit.MILLISECONDS
))
);
Threading and Schedulers
Default Threading:
// Operations run on subscribing thread by default
Observable.just("Hello")
.map(s -> {
System.out.println("Map on thread: " + Thread.currentThread().getName());
return s.toUpperCase();
})
.subscribe(result -> {
System.out.println("Received on thread: " + Thread.currentThread().getName());
});
// Both run on same thread (main)
Switching Threads with Schedulers:
// IO operations on thread pool
Observable.fromCallable(() -> {
System.out.println("Fetching on: " + Thread.currentThread().getName());
return expensiveNetworkCall();
})
.subscribeOn(Schedulers.io()) // Perform on I/O thread pool
.observeOn(Schedulers.mainThread()) // Receive on main thread
.subscribe(
result -> {
System.out.println("Got result on: " + Thread.currentThread().getName());
updateUI(result);
},
error -> System.err.println("Error: " + error)
);
// Output:
// Fetching on: RxCachedThreadScheduler-1
// Got result on: main
Common Schedulers:
// I/O operations (network, disk)
.subscribeOn(Schedulers.io())
// CPU-intensive operations
.subscribeOn(Schedulers.computation())
// Main/UI thread
.observeOn(Schedulers.mainThread())
// Immediate/current thread
.subscribeOn(Schedulers.immediate())
// Single background thread
.subscribeOn(Schedulers.single())
// Custom executor
.subscribeOn(Schedulers.from(executor))
Subscription Management
Disposing Subscriptions:
// Keep reference to disposable
Disposable subscription = observable
.subscribe(
item -> System.out.println(item),
error -> System.err.println(error)
);
// Later, dispose to stop receiving events
subscription.dispose();
// Check if disposed
if (subscription.isDisposed()) {
System.out.println("Already stopped");
}
CompositeDisposable - Multiple Subscriptions:
public class MyComponent {
private final CompositeDisposable subscriptions = new CompositeDisposable();
public void subscribe() {
// Subscribe to multiple streams
subscriptions.add(
userStream.subscribe(user -> updateUser(user))
);
subscriptions.add(
messageStream.subscribe(msg -> showMessage(msg))
);
}
public void cleanup() {
// Dispose all at once
subscriptions.dispose();
}
}
Best Practices
1. Always Handle Errors:
// Bad - missing error handler
observable.subscribe(item -> process(item));
// Good
observable.subscribe(
item -> process(item),
error -> logger.error("Stream failed", error)
);
2. Use subscribeOn/observeOn Wisely:
// Good - I/O on thread pool, UI updates on main
networkCall()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.mainThread())
.subscribe(this::updateUI);
3. Dispose Resources:
public class Service implements AutoCloseable {
private final CompositeDisposable disposables = new CompositeDisposable();
public void start() {
disposables.add(dataStream.subscribe(...));
}
@Override
public void close() {
disposables.dispose();
}
}
// Usage
try (var service = new Service()) {
service.start();
} // Auto-disposes
4. Test Observables:
@Test
public void testObservableTransformation() {
TestObserver<Integer> test = Observable.fromIterable(
Arrays.asList(1, 2, 3)
)
.map(n -> n * 2)
.test();
test.assertValues(2, 4, 6);
test.assertComplete();
test.assertNoErrors();
}
5. Avoid Blocking in Reactive Code:
// Bad - blocks the thread
observable.subscribe(item -> {
Thread.sleep(1000); // DON'T DO THIS!
process(item);
});
// Good - use reactive delays
observable
.delay(1, TimeUnit.SECONDS)
.subscribe(this::process);