10.1 Reactive Programming Fundamentals

Reactive programming provides a declarative model for composing asynchronous, non-blocking event streams with explicit backpressure handling.

What is Reactive Programming?

The Reactive Paradigm:

// Traditional imperative - pull model
List<String> names = new ArrayList<>();
for (String line : Files.readAllLines(path)) {
    if (line.contains("@")) {
        names.add(line.substring(0, line.indexOf("@")));
    }
}
System.out.println(names);

// Reactive - push model with backpressure
// Data pushed to subscribers at their pace
// Implicit or explicit backpressure

Key Characteristics:

  1. Asynchronous: Operations don't block waiting threads
  2. Non-blocking: Threads never wait for I/O completion
  3. Event-driven: Data flows through transformation pipeline
  4. Backpressure-aware: Consumers control consumption rate
  5. Composable: Easy to combine and transform streams

Comparison with Other Models:

public class ConcurrencyModelComparison {

    // Blocking/Synchronous - threads block on I/O
    public String blockingFetch() throws Exception {
        String data = httpClient.get("https://api.example.com/data");  // Blocks!
        System.out.println(data);
        return data;
    }

    // Virtual Threads - still blocking but cheap threads
    public void virtualThreadApproach() throws Exception {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 10000; i++) {
                executor.execute(() -> {
                    try {
                        String data = httpClient.get("https://api.example.com/data");
                        System.out.println(data);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

    // Reactive - non-blocking, callbacks on completion
    public void reactiveApproach() {
        httpClientReactive.get("https://api.example.com/data")
            .subscribe(
                data -> System.out.println(data),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Complete")
            );
        // Doesn't block - returns immediately
    }
}

The Reactive Manifesto

Four Core Principles:

  1. Responsive - React quickly to user inputs and events

    // Responsive reactive system
    Observable<UserAction> userActions = /* stream of clicks, input, etc */;
    
    userActions
        .debounce(300, TimeUnit.MILLISECONDS)  // React after 300ms pause
        .subscribe(action -> handleUserInput(action));
    
  2. Resilient - Stay responsive under failure

    // Resilience with fallback
    primaryDataStream
        .onErrorResumeNext(fallbackDataStream)
        .subscribe(data -> updateUI(data));
    
  3. Elastic - Scale with load using backpressure

    // Automatic backpressure handling
    fastDataProducer
        .onBackpressureBuffer(1000)  // Buffer up to 1000 items
        .subscribe(subscriber);
    
  4. Message-driven - Async communication with explicit backpressure

    // Message passing with backpressure
    publisher.subscribe(new Subscriber<T>() {
        private Subscription subscription;
    
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            s.request(10);  // Request 10 items (backpressure signal)
        }
    
        @Override
        public void onNext(T item) {
            processItem(item);
            subscription.request(1);  // Request next item
        }
    });
    

Understanding Observable and Subscriber

Observable - The Data Source:

// Observable emits data to subscribers
// Can emit 0, 1, or many items
// Signals completion or error

Observable<String> dataStream = Observable.create(emitter -> {
    try {
        emitter.onNext("Item 1");
        emitter.onNext("Item 2");
        emitter.onNext("Item 3");
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
});

Subscriber - The Observer:

// Subscriber receives items from observable
// Handles next, error, and complete signals

dataStream.subscribe(
    item -> System.out.println("Got: " + item),           // onNext
    error -> System.err.println("Error: " + error),       // onError
    () -> System.out.println("Stream complete")           // onComplete
);

// Or using Observer interface
dataStream.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("Subscribed");
    }

    @Override
    public void onNext(String value) {
        System.out.println("Received: " + value);
    }

    @Override
    public void onError(Throwable e) {
        System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Stream ended");
    }
});

Creating Observables

From Values:

// Single value
Observable<String> single = Observable.just("Hello");

// Multiple values
Observable<Integer> multiple = Observable.fromIterable(
    Arrays.asList(1, 2, 3, 4, 5)
);

// Array
Observable<String> fromArray = Observable.fromArray("a", "b", "c");

// Callable/Supplier
Observable<String> deferred = Observable.fromCallable(() -> 
    expensiveOperation()  // Called lazily when subscribed
);

From Events and Callbacks:

public class CallbackAdapter {
    // Wrap callback-based API in Observable
    public Observable<String> wrapCallback() {
        return Observable.create(emitter -> {
            legacyApi.fetchData(new DataCallback() {
                @Override
                public void onSuccess(String data) {
                    emitter.onNext(data);
                    emitter.onComplete();
                }

                @Override
                public void onFailure(Exception error) {
                    emitter.onError(error);
                }
            });
        });
    }
}

Lazy Evaluation:

// Observable doesn't do work until subscribed
Observable<String> lazy = Observable.create(emitter -> {
    System.out.println("Doing expensive work...");  // Only when subscribed
    emitter.onNext("Result");
    emitter.onComplete();
});

// Nothing printed yet - not subscribed
System.out.println("Created observable");

// Now it works
lazy.subscribe(result -> System.out.println(result));
// Output:
// Created observable
// Doing expensive work...
// Result

Transforming Data

Map - Transform Each Item:

Observable<Integer> numbers = Observable.fromIterable(
    Arrays.asList(1, 2, 3, 4, 5)
);

numbers
    .map(n -> n * n)  // Square each number
    .subscribe(result -> System.out.println(result));

// Output: 1 4 9 16 25

FlatMap - Async Transformation:

Observable<String> userIds = Observable.just("user1", "user2", "user3");

userIds
    .flatMap(userId -> fetchUserProfile(userId))  // Returns Observable
    .subscribe(
        profile -> System.out.println("Got profile: " + profile.name()),
        error -> System.err.println("Error: " + error)
    );

// fetchUserProfile returns Observable<UserProfile>
private Observable<UserProfile> fetchUserProfile(String userId) {
    return Observable.create(emitter -> {
        // Async fetch
        userService.getProfile(userId, profile -> {
            emitter.onNext(profile);
            emitter.onComplete();
        });
    });
}

Filter - Select Items:

Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6))
    .filter(n -> n % 2 == 0)  // Only even numbers
    .subscribe(result -> System.out.println(result));

// Output: 2 4 6

Scan - Running Accumulation:

Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
    .scan(0, (acc, value) -> acc + value)  // Running sum
    .subscribe(result -> System.out.println(result));

// Output: 1 3 6 10 15

Error Handling

Basic Error Propagation:

Observable<String> errorStream = Observable.create(emitter -> {
    emitter.onNext("Item 1");
    emitter.onError(new RuntimeException("Something went wrong"));
});

errorStream.subscribe(
    item -> System.out.println("Got: " + item),
    error -> System.err.println("Error: " + error.getMessage()),
    () -> System.out.println("Complete")
);

// Output:
// Got: Item 1
// Error: Something went wrong

Recovery Strategies:

// OnErrorReturn - return default on error
Observable<String> withDefault = errorStream
    .onErrorReturn(error -> "Default value");

// OnErrorResumeNext - switch to another stream
Observable<String> withFallback = errorStream
    .onErrorResumeNext(fallbackStream);

// Retry - resubscribe on error
Observable<String> withRetry = errorStream
    .retry(3);  // Retry up to 3 times

// RetryWhen - retry with custom logic
Observable<String> retryWithBackoff = errorStream
    .retryWhen(errors -> errors
        .zipWith(Observable.range(1, 3), (error, index) -> index)
        .flatMap(retryCount -> Observable.timer(
            100 * retryCount,  // Exponential backoff
            TimeUnit.MILLISECONDS
        ))
    );

Threading and Schedulers

Default Threading:

// Operations run on subscribing thread by default
Observable.just("Hello")
    .map(s -> {
        System.out.println("Map on thread: " + Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .subscribe(result -> {
        System.out.println("Received on thread: " + Thread.currentThread().getName());
    });

// Both run on same thread (main)

Switching Threads with Schedulers:

// IO operations on thread pool
Observable.fromCallable(() -> {
    System.out.println("Fetching on: " + Thread.currentThread().getName());
    return expensiveNetworkCall();
})
    .subscribeOn(Schedulers.io())  // Perform on I/O thread pool
    .observeOn(Schedulers.mainThread())  // Receive on main thread
    .subscribe(
        result -> {
            System.out.println("Got result on: " + Thread.currentThread().getName());
            updateUI(result);
        },
        error -> System.err.println("Error: " + error)
    );

// Output:
// Fetching on: RxCachedThreadScheduler-1
// Got result on: main

Common Schedulers:

// I/O operations (network, disk)
.subscribeOn(Schedulers.io())

// CPU-intensive operations
.subscribeOn(Schedulers.computation())

// Main/UI thread
.observeOn(Schedulers.mainThread())

// Immediate/current thread
.subscribeOn(Schedulers.immediate())

// Single background thread
.subscribeOn(Schedulers.single())

// Custom executor
.subscribeOn(Schedulers.from(executor))

Subscription Management

Disposing Subscriptions:

// Keep reference to disposable
Disposable subscription = observable
    .subscribe(
        item -> System.out.println(item),
        error -> System.err.println(error)
    );

// Later, dispose to stop receiving events
subscription.dispose();

// Check if disposed
if (subscription.isDisposed()) {
    System.out.println("Already stopped");
}

CompositeDisposable - Multiple Subscriptions:

public class MyComponent {
    private final CompositeDisposable subscriptions = new CompositeDisposable();

    public void subscribe() {
        // Subscribe to multiple streams
        subscriptions.add(
            userStream.subscribe(user -> updateUser(user))
        );

        subscriptions.add(
            messageStream.subscribe(msg -> showMessage(msg))
        );
    }

    public void cleanup() {
        // Dispose all at once
        subscriptions.dispose();
    }
}

Best Practices

1. Always Handle Errors:

// Bad - missing error handler
observable.subscribe(item -> process(item));

// Good
observable.subscribe(
    item -> process(item),
    error -> logger.error("Stream failed", error)
);

2. Use subscribeOn/observeOn Wisely:

// Good - I/O on thread pool, UI updates on main
networkCall()
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.mainThread())
    .subscribe(this::updateUI);

3. Dispose Resources:

public class Service implements AutoCloseable {
    private final CompositeDisposable disposables = new CompositeDisposable();

    public void start() {
        disposables.add(dataStream.subscribe(...));
    }

    @Override
    public void close() {
        disposables.dispose();
    }
}

// Usage
try (var service = new Service()) {
    service.start();
} // Auto-disposes

4. Test Observables:

@Test
public void testObservableTransformation() {
    TestObserver<Integer> test = Observable.fromIterable(
        Arrays.asList(1, 2, 3)
    )
        .map(n -> n * 2)
        .test();

    test.assertValues(2, 4, 6);
    test.assertComplete();
    test.assertNoErrors();
}

5. Avoid Blocking in Reactive Code:

// Bad - blocks the thread
observable.subscribe(item -> {
    Thread.sleep(1000);  // DON'T DO THIS!
    process(item);
});

// Good - use reactive delays
observable
    .delay(1, TimeUnit.SECONDS)
    .subscribe(this::process);