Project Reactor is Spring’s implementation of Reactive Streams (in version 3 and beyond). It has two main publishers, Flux<T> and Mono<T>. It also uses Schedulers much like RxJava.
The Spring Framework has many integrations with Reactor that make it easier to use with other Spring projects, such as Spring Data and Spring Security.
Getting Started
Flux
Much like in RxJava, Reactor uses Schedulers to decide on what thread to run.
The preceding code would print out the numbers 1 through 100.
onErrorResume(Function) : Takes the exception and returns a different Publisher as a fallback or secondary stream.
onErrorMap(Function) : Takes the exception and allows you to modify it or return a completely new Exception if you prefer.
onErrorReturn(T) : Provides a default value to use when an error arises.
doOnError(Consumer<? super Throwable>) : Allows you to handle the error without effecting the underlying stream in any way.
Errors are always ending events for a Flux or Mono and should be handled by the Subscriber. However, many times, such as in the preceding example, an error is not possible and therefore does not need to be handled.
Mono
justOrEmpty(T) : Takes a nullable value and converts into a Mono. If null, the result is the same as Mono.empty().
justOrEmpty(Optional) : Takes an Optional and converts into a Mono directly.
The corresponding code can handle errors from a Mono in the same way as with a Flux (using onErrorResume, onErrorMap, or onErrorReturn).
Creating a Flux or Mono
You can create a Flux from fixed data (cold) or programmatically from dynamic data (hot).
- 1.
Create a Flux from a list of values.
- 2.
Create a Flux from an Iterable.
- 3.
Create a range from 1 to 64.
Here’s how to create a simple Mono:Mono<String> noData = Mono.empty(); //1Mono<String> data = Mono.just("foo"); //2 - 4.
Create an empty Mono.
- 5.
Create a Mono with one element.
You can programmatically create a hot or cold Flux using one of the generate, create, or push methods . If the data is of a continuous nature, such as user input, a WebSocket, or network packets, it would be considered hot.
- 1.
The constructor of AtomicLong is used as the supplier.
- 2.
After incrementing, supply the square of the number to the sink.
- 3.
When the number is 10, the complete() method is called, which calls onComplete to any subscriber, closing out the Flux.
The create method takes a Consumer<? super FluxSink<T>> that exposes a FluxSink<T> instance with next, error, and complete methods. This allows you to arbitrarily publish data onto a Flux in any way you see fit.
The preceding code would produce a Flux of the squares of the numbers from zero to ten.
This would print out just the values 1, 2, and 3.
Schedulers
The Schedulers class under the reactor.core.scheduler package provides many static methods for Schedulers that determine what Thread or Threads your code will run on.
Schedulers.immediate() : The current thread.
Schedulers.single() : A single, reusable thread. Note that this method reuses the same thread for all callers, until the Scheduler is disposed. If you want a per-call dedicated thread, use Schedulers.newSingle() for each call.
Schedulers.newSingle() : Creates a new Thread each time it is called to be used by the underlying Flux.
Schedulers.elastic() : An elastic thread pool. It creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (default is 60 seconds) are disposed. This is a good choice for I/O blocking work for instance. Schedulers.elastic() is a handy way to give a blocking process its own thread, so that it does not tie up other resources.
Schedulers.parallel() : A fixed pool of workers. It creates as many workers as you have CPU cores.
Schedulers.fromExecutor(Executor) : Creates a Scheduler to use the given Executor, allowing you to use your extensive knowledge of Java’s Executors.
- 1.
First we use Flux.range to take the range from 1 to 64 and call flatMap (which takes a lambda expression that converts each value in the range into a new Reactor type, Mono in this case).
- 2.
Using Schedulers.newSingle(name), we create a new single thread for each value, and passing to subscribeOn will cause the mapping expression to be executed on that single thread. Keep in mind we are describing the execution of the Mono here, not the initial Flux.
- 3.
We provide exception handling code using doOnError just in case.
- 4.
Using doOnComplete we print out “Completed” when the whole execution is finished.
- 5.
Finally, we subscribe to the Flux (without this step, nothing would ever happen) and add the result to our list of squares.
The result of running this code will be that the squares List has the value of every square from 1 to 64.
Here we see once again how in Reactive Streams everything can become a stream, even a single value. By creating a Mono for each value in the range, we’re able to use Reactor to declare what kind of threading we want for every calculation. In this case, since we are using newSingle , all of the processing will be done in parallel with a new thread for all 64 values.
However, this is probably not the most efficient implementation since creating lots of Threads causes a lot of overhead. Instead, we should use Schedulers.parallel() so that the exact number of Threads your CPU can handle will be created. In this way, Reactor takes care of the details for you.
Pull Events
- 1.
Poll for events from the channel when requests are made with the given number. This “n” is the number of items requested.
- 2.
Call the channel’s cancel method when the Flux is cancelled.
- 3.
The channel.close method is given to onDispose to be invoked for complete, error, or cancel.
- 4.
Finally, register the sink’s “next” method as a listener to the channel.
Keep in mind that the Consumer passed to onRequest will not be called multiple times for no reason. It will be called with some number (e.g., 256) and then not called again until a significant number of items have been published to the Flux (i.e., sink.next called many times).
Reminder The code examples used in this book are available on GitHub .
Handling Backpressure
onBackpressureBuffer() : Buffers all items until they can be handled downstream.
onBackpressureBuffer(maxSize) : Buffers items up to the given count.
onBackpressureBuffer(maxSize, BufferOverflowStrategy) : Buffers items up to the given count and allows you to specify the strategy to use when and if the buffer is full. BufferOverflowStrategy is an enum that has three values: DROP_OLDEST, which drops the oldest items in the buffer, DROP_LATEST which drops the newer items, and ERROR which would terminate the stream with an error.
onBackpressureLatest() : Similar to keeping a buffer of only the last item added. If the downstream does not keep up with upstream, only the latest element will be given downstream.
onBackpressureError() : Ends the Flux with an error (calling the downstream Subscriber’s onError) with an IllegalStateException from Exceptions.failWithOverflow() if more items were produced upstream than requested downstream.
onBackpressureDrop() : Drops any items produced above what was requested. This would be useful, for example, in UI code to drop user input that can’t be handled immediately.
onBackpressureDrop(Consumer) : Drops any items produced above what was requested and calls the given Consumer for each dropped item.
With each of these methods, the strategy only applies when items are produced on the stream faster than they can be handled. If that’s not the case, for example, with a cold stream, no backpressure strategy is necessary.
Also keep in mind that Reactor is not magic, and some care should be taken when considering backpressure strategies.
Reactor has excellent online documentation for consideration.
Context
Since version 3.1.0, Reactor comes with an advanced feature that is somewhat comparable to ThreadLocal but applied to a Flux or a Mono instead of a Thread: the Context.
Reactor’s Context is much like an immutable Map or key/value store. It is stored transparently from the Subscriber upward through the Subscription. Context is Reactor specific and does not work with the other Reactive Streams implementations.
- 1.
Create a Flux of just one value.
- 2.
Use flatMap, access the Context, and use it to create a String value using the “pid” key. We use the static method on Mono, subscriberContext() to access the value from the Context by calling “getOrDefault” on it.
- 3.
This uses the StepVerifier (which we cover next) to verify that we get the expected value. The StepVerifier subscribes to the Flux after setting the Context using the “subscriberContext” method.
- 4.
Call “expectNext” with a value of “1 pid: 123” which is what we expect from setting the value 123 with the key of “pid” on the Context.
Context is useful for storing data that is peripheral to the Flux, but still important. For example, sometimes we have some identifier that represents the action or the user that initiated an action, and we want to include it in log outputs (like what MDC is used for in logback).
Testing
Automated testing is always a good idea, and it would be nice to have tools to directly test Reactive Streams. Luckily, Reactor comes with a few elements dedicated to testing which are gathered into their own artifact we included earlier: reactor-test.
Testing that a sequence follows a given scenario with StepVerifier
Producing data in order to test the behavior of operators (including your own operators) downstream with TestPublisher
StepVerifier
Reactor’s StepVerifier can be used to verify the behavior of a Reactor Publisher (Flux or Mono). StepVerifier is an interface used for testing that can be created using one of several static methods on StepVerifier itself.
- 1.
Create a Mono wrapping a RuntimeException imitating an actual error state.
- 2.
Create a StepVerifier wrapping that Mono.
- 3.
Declare that an onError event is expected and the Exception’s error message is “error”.
- 4.
Must call verify() at the end. This will throw an AssertionError if any expectations are not met.
- 1.
Create a Mono wrapping one value, “foo”.
- 2.
Create a StepVerifier wrapping that Mono.
- 3.
Expect onNext is called with “foo”.
- 4.
Call verifyComplete() has the same effect as verify() but also expects onComplete was called.
- 1.
Create a Flux of just three numbers.
- 2.
Create a StepVerifier wrapping that Flux.
- 3.
Call expectNext for each value expected.
- 4.
Call expectComplete to expect onComplete to be called.
- 5.
Finally, you must call verify() at the end. This variation of verify takes a Duration timeout value. Here it is 10 seconds. This can be useful to prevent the Test from hanging in cases where a Publisher might never call onComplete.
TestPublisher
The TestPublisher<T> class offers the ability to provide finely tuned data for test purposes. TestPublisher is a Reactive Streams Publisher<T> but can be converted to either a Flux or Mono using flux() or mono() methods.
next(T) and next(T, T…): Triggers 1−n onNext signals.
emit(T…): Does the same as next and also terminates with an onComplete signal.
complete(): Terminates with an onComplete signal.
error(Throwable): Terminates with an onError signal.
- 1.
Create the TestPublisher instance.
- 2.
Convert it to a Flux.
- 3.
Create a new List. For test purposes we will use this list to collect values from the publisher.
- 4.
Subscribe to the publisher using two lambda expressions for onNext and onError. This will add each value emitted from the publisher to the list.
- 5.
Emit the values “foo” and “bar” from the TestPublisher.
- 6.
Assert that two values were added to the list and they are what we expect.
Note that you must subscribe to the TestPublisher before emitting any values.