© Adam L. Davis 2019
Adam L. DavisReactive Streams in Javahttps://doi.org/10.1007/978-1-4842-4176-9_6

6. Akka Streams

Adam L. Davis1 
(1)
Oviedo, FL, USA
 

Akka Streams implements the Reactive Streams standard within the larger Akka concurrency project.

Akka Streams is built on the philosophy of providing a minimal and consistent Application programming interface (API) that is extremely compositional, meaning it is broken into pieces that can be combined in many ways.

Unlike RxJava and Reactor, the topology of streams (flows) in Akka Streams is immutable once they have been materialized. This means that you must be explicit to convert a flow into a Reactive Streams interface to have a dynamic topology (as we’ll cover later on).

Although most familiar in Scala-based applications, Akka Streams has a Java-specific API, and the documentation lets you select Java or Scala as your target language with specific examples for each.

Akka Streams uses the concepts of Source and Sink to correspond roughly with Publisher and Subscriber of other Reactive Streams frameworks. It also has the concept of Flow which is roughly equivalent to Processor and Graphs which are like blueprints of Flows, Sinks, or Sources.

Getting Started

If you have a Maven build , add the following to your pom file:
<dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-stream_2.12</artifactId>
        <version>2.5.16</version>
</dependency>
<dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-stream-testkit_2.12</artifactId>
        <version>2.5.16</version>
        <scope>test</scope>
</dependency>
For Gradle builds , add the following to your Gradle build file’s dependencies:
compile 'com.typesafe.akka:akka-stream_2.12:2.5.16'
testCompile 'com.typesafe.akka:akka-stream-testkit_2.12:2.5.16'
Use the following imports:
import akka.stream.*;
import akka.stream.javadsl.*;
In this example we will be taking a stream of messages and extracting all messages that begin with Error:
final ActorSystem system = ActorSystem.create(
"reactive-messages"); //1
final Materializer mat = ActorMaterializer.create(system); //2
Source<String, NotUsed> messages = Source
.single("Error: test message");
final Source<String, NotUsed> errors =
        messages.filter(m -> m.startsWith("Error")) //3
        .map(m -> m.toString()); //4
errors.runWith(Sink.foreach(System.out::println), mat); //5
  1. 1.

    We create the Akka ActorSystem to define the multithreaded environment for execution. We provide a name “reactive-messages” which is optional and gives a logical name to the ActorSystem.

     
  2. 2.

    The execution environment (similar to Schedulers in RxJava) is known as a Materializer here. Unlike RxJava, the developer controls concurrency by calling methods like async() and mapAsync(int,Function) on a Source or Flow.

     
  3. 3.

    We filter out only the error messages.

     
  4. 4.

    Although not necessary, we call toString on each message to illustrate using the map method.

     
  5. 5.

    Finally, we use runWith and pass in a Sink which prints out each error message.

     

Although here we are using the foreach Sink, any sink could be used, including user-defined sinks.

To avoid conceptual conflicts with the existing flatMap in Scala, Akka Streams uses flatMapConcat, flatMapMerge, and mapConcat. The mapConcat method expects Iterables returned from the function, not streams. The other two methods act as their names suggest, either merging streams or appending them sequentially.

ActorMaterializer

The ActorMaterializer in Akka Streams is similar to Schedulers in the other two Reactive Streams implementations but not the same. Unlike Schedulers, there are not several predefined singletons to choose from; instead you should generally create one for your whole application and specify some general settings.

An ActorMaterializer is created in the following way:
public static Materializer createMaterializer() {
 final ActorSystem system = ActorSystem.create(); // 1
 ActorMaterializerSettings settings =
  ActorMaterializerSettings.create(system) //2
        .withMaxFixedBufferSize(100) //3
        .withInputBuffer(8, 16); //4
 return ActorMaterializer.create(settings,system);//5
  1. 1.

    Create the ActorSystem.

     
  2. 2.

    Optionally create ActorMaterializerSettings. This allows you to configure internal settings used by Akka Streams to enhance performance for your particular project.

     
  3. 3.

    Set maximum fixed buffer size to 100. Stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, flatMapMerge, Source.actorRef, Source.queue, etc.) that request a lower buffer size will use this value as the initial fixed buffer size. The default is very large to make failures happen earlier, not when scaling up. You might change it if you want to use a small amount of memory, for example.

     
  4. 4.

    Set the initial and maximum size of internal stream buffers. Here we set the initial value to 8 and maximum to 16, which are the defaults.

     
  5. 5.

    Finally create the ActorMaterializer with the given settings and ActorSystem.

     

Sinks, Flows, and Graphs

One of the interesting things about Akka Streams is that every part can be defined, is immutable, and can be reused independently. For this purpose, Akka Streams has the concept of Flow, Graph, Source, and Sink.
  • Flow: A Flow has both an input and an output. So, you can define a Flow with only the type of the data that will be streamed, without the actual data. It is similar to org.reactivestreams.Processor which is both a Publisher and a Subscriber.

  • Graph: A Graph can define any arbitrary branching and recombining of streams. A Graph is immutable, thread-safe, and reusable. A Graph that is self-contained (has no input or output) is a RunnableGraph and can be materialized.

  • Source: A Source has exactly one output. It is a source of data, similar to a Publisher, and can be created in many different ways.

  • Sink: As seen before, a Sink is the ending point of a stream. It represents what we do with the data. It has exactly one input.

Using Flow, you can define a Sink separately from defining any sources. For example, this sink would save to a file:
public Sink<String, CompletionStage<IOResult>> lineSink(
        String filename) {
  return Flow.of(String.class)
    .map(s -> ByteString.fromString(s.toString() + "\n"))
    .toMat(FileIO.toPath(Paths.get(filename)),
         Keep.right());
}

First, we create a Flow of type “String”; this declares what type you are expecting. Second, we map each string into a ByteString. At this point, the type is now Flow<ByteString>. Lastly, we call toMat (which is short for toMaterialized) to write the result to a file using an existing Sink (FileIO is part of the Akka Streams Java DSL). We specify Keep.right() to keep the auxiliary information from toPath.

The Sink, once defined, can be used multiple times. Notice that defining the Sink does not complete any action. No save has taken place yet until we materialize it with some Source. For example:
public void saveTextFile(List<String> text) {
        Sink<String, CompletionStage<IOResult>> sink =
                lineSink("testfile.txt");
        Source.from(text).runWith(sink, materializer);
}

This method would take a List of Strings, create a Source from them, and then save them to a file using the Sink from the lineSink method.

Graphs can be created using the GraphDSL. For example, using the previously defined methods, we can create a Graph of a SinkShape like so:
public Graph<SinkShape<String>, NotUsed> createFileSinkGraph() {
   return GraphDSL.create(builder -> {
    FlowShape<String, String> flowShape = builder
     .add(Flow.of(String.class).async()); //1
    var sink = lineSink("testfile.txt"); //2
    var sinkShape = builder.add(sink); //3
    builder.from(flowShape.out()).to(sinkShape); //4
    return new SinkShape<>(flowShape.in()); //5
  });
}
  1. 1.

    Call builder.add with a Flow to get a FlowShape. Here we create an asynchronous Flow.

     
  2. 2.

    Create a new Sink<String> by calling our lineSink method.

     
  3. 3.

    Create a SinkShape<String> from that sink.

     
  4. 4.

    Link the output from the flowShape to the sinkShape.

     
  5. 5.

    Return a new SinkShape using the input from the flowShape. We have now created the Graph of a SinkShape that will save text lines to a file.

     
We can use this graph by calling Sink.fromGraph to create a Sink:
public void saveTextFileUsingGraph(List<String> text) {
  Sink.fromGraph(createFileSinkGraph())
    .runWith(Source.from(text), materializer);
}

The preceding code would use the Graph we created to create a new Sink and run it with a Source created from the given List, thus saving the text, one line per element of the list.

Backpressure

Backpressure strategies can be defined on the stream to describe what to do when too many elements are produced. For example, we can buffer our messages stream:
messages
        .buffer(100, OverflowStrategy.dropHead())

This would buffer 100 elements, dropping the oldest (dropHead). You should pick whatever strategy best fits your problem space.

Other options include
  • dropTail() : Drops the newest elements from the buffer.

  • dropBuffer() : An aggressive strategy that drops the entire buffer once it is full.

  • dropNew() : Drops any new elements when the buffer is full.

  • backpressure() : The strategy would cause backpressure signal to be pushed upstream if the buffer is full. In other words, the amount requested from upstream would fall to zero until the buffer was no longer full.

  • fail() : Fails the stream entirely when the buffer is full.

Interoperation with Reactive Streams API

Due to Akka Streams’ immutable topology requirements, it can be surprising to people familiar with other Reactive Streams libraries.

In order to obtain a Publisher or Subscriber from an Akka Stream topology, a corresponding Sink.asPublisher or Source.asSubscriber element must be used.

A Sink must be created with Sink.asPublisher(AsPublisher.WITH_FANOUT) (for enabling fan-out support) where broadcast behavior is needed for interoperation with other Reactive Streams implementations. If “AsPublisher.WITHOUT_FANOUT” is used instead, the resulting Publisher will only allow one Subscriber.

An Akka Streams Flow can also be converted to a Processor using Flow’s toProcessor() method; however, it is also limited to only one Subscriber.

To get around these limitations, and create dynamic stream handling within Akka Streams, you can use MergeHub, BroadcastHub, and PartitionHub.

MergeHub , BroadcastHub , and PartitionHub

For dynamically defined flows of data that need to have multiple consumers or multiple producers of data, Akka Streams has the following classes:
  • A MergeHub allows any number of flows to go into a single Sink.

  • A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.

  • A PartitionHub can be used to route elements from a common producer to a dynamic set of consumers. The selection of consumer is done with a function and each element can only be routed to one consumer.

For example, here is a simple use case of a MergeHub:
Sink<String, CompletionStage<Done>> consumer =
        Sink.foreach(System.out::println); //1
int bufferSize = 8;
RunnableGraph<Sink<String, NotUsed>> runnableGraph =
    MergeHub.of(String.class, bufferSize)
        .to(consumer); //2
Sink<String, NotUsed> toConsumer =
    runnableGraph.run(materializer); //3
  1. 1.

    A simple consumer that will print to the console.

     
  2. 2.

    Attach a MergeHub Source to the consumer. This will materialize to a corresponding Sink when run. The buffer size is used per producer.

     
  3. 3.

    Finally we must run and materialize the runnableGraph to get the Sink. This Sink can be materialized any number of times, and every element that enters it will be consumed by the “consumer” defined in step 1.

     

For more information about MergeHub , BroadcastHub , and PartitionHub , see the documentation .

Testing

Akka Streams includes a testkit to assist in creating tests around your application. It includes the following:
  • TestKit : Has a method that is useful for shutting down the ActorSystem between each test

  • TestSink : Enables probing of an Akka Stream Source directly using a TestSubscriber.Probe<T> instance

  • TestSource : Enables probing of a Sink using a TestPublisher.Probe<String> probe instance

Add the following imports to the test class:
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.*;
import akka.stream.testkit.*;
import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit;
import org.junit.*;
Define our setup and tearDown methods:
ActorSystem system;
ActorMaterializer materializer;
@Before
public void setup() {
  system = ActorSystem.create();
  materializer = ActorMaterializer.create(system);
}
@After
public void tearDown() {
  TestKit.shutdownActorSystem(system);
}
Now we can write tests using TestSink to probe any Source. For example:
@Test
public void test_a_source() {
  Sink<Object, TestSubscriber.Probe<Object>> sink =
        TestSink.probe(system); //1
  Source<Object, NotUsed> sourceUnderTest =
        Source.single("test"); //2
  sourceUnderTest.runWith(sink, materializer) //3
        .request(1)
        .expectNext("test")
        .expectComplete();
}
  1. 1.

    Create the TestSink instance.

     
  2. 2.

    Create the Source we want to test. In a real test, this would come from some part of your production code.

     
  3. 3.

    Run the Source with the TestSink and use the resulting TestSubscriber.Probe<T> instance to request one value and expect it to be “test”. Calling expectComplete() means we expect the Source to send the “on-complete” signal, and if not it will through an AssertionError.

     
We can also test Sinks using the TestSource.probe(ActorSystem) method as follows:
Sink<String, CompletionStage<List<String>>>
        sinkUnderTest = Sink.seq(); //1
final Pair<TestPublisher.Probe<String>,
 CompletionStage<List<String>>> stagePair =
   TestSource.<String>probe(system)
        .toMat(sinkUnderTest, Keep.both()) //2
        .run(materializer);
final TestPublisher.Probe<String> probe =
        stagePair.first(); //3
final CompletionStage<List<String>> future =
        stagePair.second();
probe.expectRequest(); //4
probe.sendNext("test");
probe.sendError(new Exception("boom!"));
try {
  future.toCompletableFuture().get(2, TimeUnit.SECONDS); //5
  assert false;
} catch (ExecutionException ee) {
  final Throwable exception = ee.getCause();
  assertEquals(exception.getMessage(), "boom!"); //6
}
  1. 1.

    Get an instance of the Sink we want to test.

     
  2. 2.

    Create and materialize the TestSource with sinkUnderTest and keep both the materialized value and auxiliary value using Keep.both().

     
  3. 3.

    Get references to both the TestPublisher.Probe<String> and the CompletionStage (future) as we will use them later.

     
  4. 4.

    Call several methods on the probe to expect the Sink requested data, send some data, then call sendError on the probe with an Exception instance.

     
  5. 5.

    Convert the CompletionStage from the previous step to a CompletableFuture and call “get” with a timeout of two seconds (just in case the underlying future would never complete).

     
  6. 6.

    Finally, assert that the Exception was thrown and it has the message “boom!”