Akka Streams implements the Reactive Streams standard within the larger Akka concurrency project.
Akka Streams is built on the philosophy of providing a minimal and consistent Application programming interface (API) that is extremely compositional, meaning it is broken into pieces that can be combined in many ways.
Unlike RxJava and Reactor, the topology of streams (flows) in Akka Streams is immutable once they have been materialized. This means that you must be explicit to convert a flow into a Reactive Streams interface to have a dynamic topology (as we’ll cover later on).
Although most familiar in Scala-based applications, Akka Streams has a Java-specific API, and the documentation lets you select Java or Scala as your target language with specific examples for each.
Akka Streams uses the concepts of Source and Sink to correspond roughly with Publisher and Subscriber of other Reactive Streams frameworks. It also has the concept of Flow which is roughly equivalent to Processor and Graphs which are like blueprints of Flows, Sinks, or Sources.
Getting Started
- 1.
We create the Akka ActorSystem to define the multithreaded environment for execution. We provide a name “reactive-messages” which is optional and gives a logical name to the ActorSystem.
- 2.
The execution environment (similar to Schedulers in RxJava) is known as a Materializer here. Unlike RxJava, the developer controls concurrency by calling methods like async() and mapAsync(int,Function) on a Source or Flow.
- 3.
We filter out only the error messages.
- 4.
Although not necessary, we call toString on each message to illustrate using the map method.
- 5.
Finally, we use runWith and pass in a Sink which prints out each error message.
Although here we are using the foreach Sink, any sink could be used, including user-defined sinks.
To avoid conceptual conflicts with the existing flatMap in Scala, Akka Streams uses flatMapConcat, flatMapMerge, and mapConcat. The mapConcat method expects Iterables returned from the function, not streams. The other two methods act as their names suggest, either merging streams or appending them sequentially.
ActorMaterializer
The ActorMaterializer in Akka Streams is similar to Schedulers in the other two Reactive Streams implementations but not the same. Unlike Schedulers, there are not several predefined singletons to choose from; instead you should generally create one for your whole application and specify some general settings.
- 1.
Create the ActorSystem.
- 2.
Optionally create ActorMaterializerSettings. This allows you to configure internal settings used by Akka Streams to enhance performance for your particular project.
- 3.
Set maximum fixed buffer size to 100. Stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, flatMapMerge, Source.actorRef, Source.queue, etc.) that request a lower buffer size will use this value as the initial fixed buffer size. The default is very large to make failures happen earlier, not when scaling up. You might change it if you want to use a small amount of memory, for example.
- 4.
Set the initial and maximum size of internal stream buffers. Here we set the initial value to 8 and maximum to 16, which are the defaults.
- 5.
Finally create the ActorMaterializer with the given settings and ActorSystem.
Sinks, Flows, and Graphs
Flow: A Flow has both an input and an output. So, you can define a Flow with only the type of the data that will be streamed, without the actual data. It is similar to org.reactivestreams.Processor which is both a Publisher and a Subscriber.
Graph: A Graph can define any arbitrary branching and recombining of streams. A Graph is immutable, thread-safe, and reusable. A Graph that is self-contained (has no input or output) is a RunnableGraph and can be materialized.
Source: A Source has exactly one output. It is a source of data, similar to a Publisher, and can be created in many different ways.
Sink: As seen before, a Sink is the ending point of a stream. It represents what we do with the data. It has exactly one input.
First, we create a Flow of type “String”; this declares what type you are expecting. Second, we map each string into a ByteString. At this point, the type is now Flow<ByteString>. Lastly, we call toMat (which is short for toMaterialized) to write the result to a file using an existing Sink (FileIO is part of the Akka Streams Java DSL). We specify Keep.right() to keep the auxiliary information from toPath.
This method would take a List of Strings, create a Source from them, and then save them to a file using the Sink from the lineSink method.
- 1.
Call builder.add with a Flow to get a FlowShape. Here we create an asynchronous Flow.
- 2.
Create a new Sink<String> by calling our lineSink method.
- 3.
Create a SinkShape<String> from that sink.
- 4.
Link the output from the flowShape to the sinkShape.
- 5.
Return a new SinkShape using the input from the flowShape. We have now created the Graph of a SinkShape that will save text lines to a file.
The preceding code would use the Graph we created to create a new Sink and run it with a Source created from the given List, thus saving the text, one line per element of the list.
Backpressure
This would buffer 100 elements, dropping the oldest (dropHead). You should pick whatever strategy best fits your problem space.
dropTail() : Drops the newest elements from the buffer.
dropBuffer() : An aggressive strategy that drops the entire buffer once it is full.
dropNew() : Drops any new elements when the buffer is full.
backpressure() : The strategy would cause backpressure signal to be pushed upstream if the buffer is full. In other words, the amount requested from upstream would fall to zero until the buffer was no longer full.
fail() : Fails the stream entirely when the buffer is full.
Interoperation with Reactive Streams API
Due to Akka Streams’ immutable topology requirements, it can be surprising to people familiar with other Reactive Streams libraries.
In order to obtain a Publisher or Subscriber from an Akka Stream topology, a corresponding Sink.asPublisher or Source.asSubscriber element must be used.
A Sink must be created with Sink.asPublisher(AsPublisher.WITH_FANOUT) (for enabling fan-out support) where broadcast behavior is needed for interoperation with other Reactive Streams implementations. If “AsPublisher.WITHOUT_FANOUT” is used instead, the resulting Publisher will only allow one Subscriber.
An Akka Streams Flow can also be converted to a Processor using Flow’s toProcessor() method; however, it is also limited to only one Subscriber.
To get around these limitations, and create dynamic stream handling within Akka Streams, you can use MergeHub, BroadcastHub, and PartitionHub.
MergeHub , BroadcastHub , and PartitionHub
A MergeHub allows any number of flows to go into a single Sink.
A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.
A PartitionHub can be used to route elements from a common producer to a dynamic set of consumers. The selection of consumer is done with a function and each element can only be routed to one consumer.
- 1.
A simple consumer that will print to the console.
- 2.
Attach a MergeHub Source to the consumer. This will materialize to a corresponding Sink when run. The buffer size is used per producer.
- 3.
Finally we must run and materialize the runnableGraph to get the Sink. This Sink can be materialized any number of times, and every element that enters it will be consumed by the “consumer” defined in step 1.
For more information about MergeHub , BroadcastHub , and PartitionHub , see the documentation .
Testing
TestKit : Has a method that is useful for shutting down the ActorSystem between each test
TestSink : Enables probing of an Akka Stream Source directly using a TestSubscriber.Probe<T> instance
TestSource : Enables probing of a Sink using a TestPublisher.Probe<String> probe instance
- 1.
Create the TestSink instance.
- 2.
Create the Source we want to test. In a real test, this would come from some part of your production code.
- 3.
Run the Source with the TestSink and use the resulting TestSubscriber.Probe<T> instance to request one value and expect it to be “test”. Calling expectComplete() means we expect the Source to send the “on-complete” signal, and if not it will through an AssertionError.
- 1.
Get an instance of the Sink we want to test.
- 2.
Create and materialize the TestSource with sinkUnderTest and keep both the materialized value and auxiliary value using Keep.both().
- 3.
Get references to both the TestPublisher.Probe<String> and the CompletionStage (future) as we will use them later.
- 4.
Call several methods on the probe to expect the Sink requested data, send some data, then call sendError on the probe with an Exception instance.
- 5.
Convert the CompletionStage from the previous step to a CompletableFuture and call “get” with a timeout of two seconds (just in case the underlying future would never complete).
- 6.
Finally, assert that the Exception was thrown and it has the message “boom!”