When considering which library or framework to use to create a web application making use of Akka Streams, there are many things to choose from, Play Framework, Apache Camel, or Akka HTTP among others. For this chapter, we’ll focus on using Akka HTTP. The Akka HTTP server is implemented on top of Akka Streams and makes heavy use of it.
Akka HTTP has been driven with a clear focus on providing tools for building integration layers rather than application cores. As such it regards itself as a suite of libraries rather than a framework.
–
Akka HTTP Docs
Akka HTTP takes an unopinionated approach and prefers to be seen as a set of libraries rather than a framework. Although this can make it more difficult to get started, it allows the developer more flexibility and a clear view of everything that’s happening. There’s no “magic” behind the scenes that makes it work.
Akka HTTP has support for the following:
HTTP: Akka HTTP implements HTTP/1.1 including persistent connections and client connection pooling.
HTTPS is supported through the facilities that Java provides.
WebSocket
: Akka HTTP implements WebSocket on both the server side and the client side.
HTTP/2
: Akka HTTP provides server-side HTTP/2 support.
Multipart
: Akka HTTP has modeled multipart/* payloads. It provides streaming multipart parsers and renderers, e.g., for parsing file uploads, and provides a typed model to access details of such a payload.
Server-sent events (SSE)
: Supported through marshalling that will provide or consume an (Akka Stream based) stream of events.
JSON
: Marshalling to and from JSON is supported out of the box for Jackson-based models in Java.
Gzip and Deflate Content-Encoding.
It also has a testing library to assist with testing.
For
our example project
, we’ll use Akka HTTP along with Akka Streams and WebSockets to create a real-time chatbot web server with a fake repository.
Getting Started
Although you can use SBT
(Scala’s build tool), Maven, or many other build tools, here we’re using Gradle.
Start by creating a build file named “
build.gradle” with the following contents:
apply plugin: 'java' //1
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'application'
group = 'com.github.adamldavis'
applicationName = 'akka-http-java' //2
version = '0.0.1-SNAPSHOT'
mainClassName = 'com.github.adamldavis.akkahttp.WebApp' //3
// requires Gradle 4.7+
sourceCompatibility = 1.10 //4
targetCompatibility = 1.10
repositories {
mavenCentral()
}
ext {
akkaHttpVersion = '10.1.5' //5
akkaVersion = '2.5.12'
}
dependencies {
compile "com.typesafe.akka:akka-http_2.12:$akkaHttpVersion" //6
compile "com.typesafe.akka:akka-http-jackson_2.12:$akkaHttpVersion"
compile "com.typesafe.akka:akka-stream_2.12:$akkaVersion"
testCompile "com.typesafe.akka:akka-http-testkit_2.12:$akkaHttpVersion"
testCompile "com.typesafe.akka:akka-stream-testkit_2.12:$akkaVersion"
testCompile 'junit:junit:4.12'
testCompile "org.assertj:assertj-core:3.11.1"
}
- 1.
- 2.
Set the application’s name.
- 3.
Set the main class with the static void main method to run.
- 4.
Set the Java version to 10.
- 5.
Set variables for versions of Akka HTTP and Akka to use.
- 6.
Specify all the dependencies necessary for this project, including the akka-http-testkit, akka-stream-testkit, junit, and assertj for tests.
Then create a class, named
WebApp, and start with the following imports:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.*;
import akka.http.javadsl.server.*;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
Next, make the class extend AllDirectives to enable the Java
DSL and add a main method like the following:
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("routes");//1
final Http http = Http.get(system); //2
final ActorMaterializer materializer =
ActorMaterializer.create(system);
var app = new WebApp(); //3
final Flow<HttpRequest, HttpResponse, NotUsed>
routeFlow = app.joinedRoutes()
.flow(system, materializer);
final CompletionStage<ServerBinding> binding =
http.bindAndHandle(routeFlow,
ConnectHttp.toHost("localhost", 5010),
materializer); //4
System.out.println("Server online at http://localhost:5010/\nUse Ctrl+C to stop");
// add shutdown Hook to terminate system:
Runtime.getRuntime().addShutdownHook(new Thread(() -> { //5
System.out.println("Shutting down...");
binding.thenCompose(ServerBinding::unbind)
.thenAccept(unbound -> system.terminate());
}));
}
- 1.
Create the ActorSystem for this application.
- 2.
Using that system, create an instance of Http, which is the Akka HTTP server.
- 3.
In order to access all directives, we need an instance where the routes are defined.
- 4.
Boot up server, binding it to port 5010 on localhost and using the routeFlow defined in the preceding code.
- 5.
Finally, we add a shutdown hook that unbinds the server and shuts down the ActorSystem.
To run the application, simply use the command “gradle run” at the command line.
Routes
Routes can be defined using the server DSL, with simple names like “route”, “path”, and “get”. The first path matched in your route will cause your handler for that route to be run. If no routes are matched, then a response with HTTP Status 404 (not found) will be returned by default.
For example, the following method defines a route that matches “/hello”:
private Route createHelloRoute() {
return route(
path("hello", () ->
get(() ->
complete(HttpEntities.create(
ContentTypes.TEXT_HTML_UTF8,
"<h1>Say hello to akka-http</h1>"))
)));
}
This route simply returns a simple HTML entity as seen in the preceding code. We create the HttpEntity by calling HttpEntities.create with a ContentType and String. The “complete” method signifies that the response is completed by the given parameter and is overloaded to take in many different values such as String, StatusCode, HttpEntity, or HttpResponse. It also has a variety with an additional parameter of type Iterable<HttpHeader> to specify the headers of the response. Here we are using the complete(HttpEntity) variety.
The HttpEntities.create method is also overloaded to take a String, ByteString, byte array, path, file, or an Akka Stream Source<ByteString, ?>.
We can test out the route by running our application and then using the “curl localhost:5010/hello” command. We should get the following output:
<h1>Say hello to akka-http</h1>
Routes can be combined into a single route using the overloaded “route”
method allowed for composition of routes. For example:
private Route joinedRoutes() {
return route(createHelloRoute(),
createRandomRoute(),
createWebsocketRoute());
}
Here we provide a route combining three routes we define.
Since Akka HTTP is built on top of Akka Streams, we can provide an infinite stream of bytes to any route. Akka HTTP will use HTTP’s built-in rate-limiting specification to provide a stream at constant memory use. The following method provides a stream of random numbers for requests on path “/random”:
private Route createRandomRoute() {
final Random rnd = new Random();
Source<Integer, NotUsed> numbers = //1
Source.fromIterator(() ->
Stream.generate(rnd::nextInt).iterator());
return route(
path("random", () ->
get(() ->
complete(
HttpEntities.create(
ContentTypes.TEXT_PLAIN_UTF8,
numbers.map(x ->
ByteString.fromString(x + "\n")))) //2
)));
}
- 1.
Here we use Stream.generate to generate an infinite stream of bytes and then use Source.fromIterator to convert it into a Source.
- 2.
Transform each number into a chunk of bytes using ByteString
.
We can test this route using the command “curl --limit-rate 1k 127.0.0.1:5010/random” while the application is running (limits the download rate to 1 kilobyte/second).
WebSockets
Lastly, we can create a
WebSocket
handling route using “handleWebSocketMessages” like so:
public Route createWebsocketRoute() {
return path("greeter", () ->
handleWebSocketMessages(
WebSocketExample.greeter())
);
}
The “greeter” method in
WebSocketExample defines a handler that treats incoming messages as a name and responds with a greeting to that name:
public static
Flow<Message, Message, NotUsed> greeter() {
return Flow.<Message>create()
.collect(new JavaPartialFunction<>() {
@Override
public Message apply(Message msg,
boolean isCheck) {
if (isCheck) {
if (msg.isText()) return null;
else throw noMatch();
} else {
return handleTextMessage(
msg.asTextMessage());
}
}});
}
public static TextMessage
handleTextMessage(TextMessage msg) {
if (msg.isStrict()) {
return TextMessage.create("Hello " +
msg.getStrictText());
} else {
return TextMessage.create(Source.single(
"Hello ").concat(msg.getStreamedText()));
}
}
The important thing to know about JavaPartialFunction is that it can be called multiple times with isCheck as true or false. If isCheck is true, it is simply checking if your JavaPartialFunction handles the given type, that’s why we “throw noMatch()” if the message is not of the TextMessage type (isText returns false).
Testing WebSockets is more complicated because of the complex WebSocket protocol. Next, we’ll build a chat application to demonstrate WebSockets in action.
Our Domain
For this example application, we’ll build a simple chat server. The core
domain model is the ChatMessage as follows:
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ChatMessage {
final String username;
final String message;
@JsonCreator
public ChatMessage(
@JsonProperty("username") String username,
@JsonProperty("message") String message) {
this.username = username;
this.message = message;
}
// toString, equals, and hashCode omitted for
// brevity
public String getUsername() { return username; }
public String getMessage() { return message; }
}
This ChatMessage object is immutable and simply holds the values of the username and message.
We’re going to use Jackson for converting to and from JSON, so we’ve got some annotations to allow this to happen.
Our Repository
For demo purposes, our
repository won’t actually save but will merely imitate a long-running operation and print out the message that was saved. Its code is as follows:
import java.util.concurrent.*;
public class MessageRepository {
public CompletionStage<ChatMessage> save(
ChatMessage message) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); }
catch (InterruptedException e)
{ e.printStackTrace(); }
System.out.println("saving message: " + message);
return message; });
}
}
It uses Java’s CompletableFuture to perform an asynchronous action and sleeps for half a second within that action. In a real application, we’d want to save ChatMessages to some sort of database which potentially would take some time blocking.
ChatServer
The main entry point of the chat server will be the ChatServer class.
It starts with the following imports:
akka.NotUsed;
akka.actor.ActorSystem;
akka.http.javadsl.model.ws.Message;
akka.http.javadsl.model.ws.TextMessage;
akka.japi.JavaPartialFunction;
akka.stream.*;
akka.stream.javadsl.*;
com.fasterxml.jackson.databind.ObjectMapper;
org.reactivestreams.Publisher;
java.util.concurrent.*;
For brevity we’ll skip the fields since they can be derived from the constructor. The
ChatServer constructor makes some very important initializations that we’ll use to propagate the ChatMessages between the clients:
public ChatServer(ActorSystem actorSystem) {
parallelism =
Runtime.getRuntime().availableProcessors(); //1
this.actorSystem = actorSystem;
materializer = ActorMaterializer.create(
actorSystem); //2
var asPublisher = Sink.<ChatMessage>asPublisher(
AsPublisher.WITH_FANOUT); //3
var publisherSinkPair =
asPublisher.preMaterialize(materializer);
publisher = publisherSinkPair.first();
sink = publisherSinkPair.second();
mergeHub = MergeHub.of(ChatMessage.class,
BUFFER_SIZE).to(sink); //4
mergeSink = mergeHub.run(materializer);
}
- 1.
Here we initialize an int property, parallelism, using Java’s built-in Runtime class. We set it to the number of available processors since that will allow us to take advantage of every processor in our parallel processing.
- 2.
Create the ActorMaterializer.
- 3.
For brevity we are using Java 10’s “var” here as the full type is very long. Using the static method “asPublisher” on Sink creates a Sink that can also act as org.reactivestreams.Publisher. By default it would only allow one subscriber, so use WITH_FANOUT to allow multiple. We must call preMaterialize to get access to the actual instances of Publisher and Sink.
- 4.
Since we want multiple clients to push ChatMessages into one sink, we must use MergeHub. Much like the previous step, you must run the MergeHub with a materializer to gain access to the Sink instance.
MergeHub and Publisher
Although it may seem complex, all we’ve done here using MergeHub and asPublisher is allow for multiple Flows to use the same Sink that in turn pushes to an instance of Publisher.
In this way we can have every new WebSocket connection post into one Sink and subscribe to one central Publisher, as we will see next.
The WebSocket Flow
For our chat-server
application, we need to create a main flow. We define it similar to before (with the addition of a Graph) with the following code (some left out for brevity):
public Flow<Message, Message, NotUsed> flow() {
Flow<Message, ChatMessage, NotUsed> savingFlow =
Flow.<Message>create() //1
.buffer(BUFFER_SIZE, OverflowStrategy.backpressure())
.collect(new
JavaPartialFunction<Message,
CompletionStage<ChatMessage>>() {
@Override
public CompletionStage<ChatMessage>
apply(Message msg, boolean isCheck) {
if (msg.isText()) {
TextMessage textMessage = msg.asTextMessage();
return storeMessageFromContent(
CompletableFuture.completedFuture(
textMessage.getStrictText()));
} else if (isCheck)
throw noMatch();
return CompletableFuture.completedStage(
new ChatMessage(null, null));
}
})
.mapAsync(parallelism, stage -> stage) // 2
.filter(m -> m.username != null);
final Graph<FlowShape<Message,Message>, NotUsed>graph = //3
GraphDSL.create(builder -> {
final FlowShape<ChatMessage, Message>
toMessage = //4
builder.add(Flow.of(ChatMessage.class)
.map(jsonMapper::writeValueAsString)
.async()
.map(TextMessage::create));
Inlet<ChatMessage> sinkInlet =
builder.add(mergeSink).in(); //5
Outlet<ChatMessage> publisherOutput = builder
.add(Source.fromPublisher(publisher)).out();
FlowShape<Message, ChatMessage> saveFlow =
builder.add(savingFlow);
builder.from(saveFlow.out()).toInlet(sinkInlet);//6
builder.from(publisherOutput)
.toInlet(toMessage.in()); // 7
return new FlowShape<>(saveFlow.in(),
toMessage.out()); // 8
});
return Flow.fromGraph(graph);
}
- 1.
Create the Flow. The type declaration describes that the Flow takes in a Message and outputs a ChatMessage and does not use the supplementary data type. We add a buffer with given size, BUFFER_SIZE, which could be as big as our system’s memory could handle. Within the JavaPartialFunction, call storeMessageFromContent which we will define later.
- 2.
Unwrap the CompletionStage<ChatMessage> using mapAsync. This call allows the database saves to be run in parallel using parallelism number of concurrent threads.
- 3.
Use the GraphDSL to create a FlowShape. This Graph will use the preceding savingFlow to save all ChatMessages and put them into the mergeSink, but use the output from the ChatServer’s Publisher so that every client gets every ChatMessage.
- 4.
Create the toMessage FlowShape which converts a ChatMessage to JSON then wraps it in a TextMessage.
- 5.
Create the “sinkInlet” by adding the mergeSink to the Graph’s builder. Also create “publisherOutput” and “saveFlow” in a similar way.
- 6.
Connect the saveFlow’s output to the sinkInlet.
- 7.
Connect the publisherOutput to the toMessage’s Inlet.
- 8.
Define FlowShape using the Inlet of saveFlow and the Outlet of the toMessage Flow.
The helper methods (and fields) such as
“storeMessageFromContent” are defined as follows:
private Flow<String, ChatMessage, NotUsed> parseContent() { //1
return Flow.of(String.class)
.map(line -> jsonMapper.readValue(line,
ChatMessage.class));
}
private Sink<ChatMessage, CompletionStage<ChatMessage>> storeChatMessages() {
return Flow.of(ChatMessage.class)
.mapAsyncUnordered(parallelism,
messageRepository::save) //2
.toMat(Sink.last(), Keep.right()); //3
}
CompletionStage<ChatMessage> storeMessageFromContent(
CompletionStage<String> content) {
return Source.fromCompletionStage(content) //4
.via(parseContent())
.runWith(storeChatMessages(),
materializer) //5
.whenComplete((message, ex) -> { //6
if (message != null) System.out
.println("Saved message: "+message);
else { ex.printStackTrace(); }
});
}
final MessageRepository messageRepository =
new MessageRepository();
final ObjectMapper jsonMapper =
new ObjectMapper(); //7
- 1.
The method parseContent returns a Flow that converts Strings to instances of ChatMessage using Jackson’s ObjectMapper, jsonMapper, we define later.
- 2.
The method storeChatMessages returns a Sink that uses mapAsyncUnordered and the save method on messageRepository (allowing saves to occur in parallel and in any order).
- 3.
This line materializes the Flow into a Sink that keeps only the last element input. This works since it’s only given a single element.
- 4.
The method storeMessageFromContent starts by creating a Source<String> from the given CompletionStage<String>.
- 5.
Then, using via(Flow), it converts that String into a ChatMessage.
- 6.
Finally, it uses whenComplete to print out each message that was saved and handles any errors. Although here we just print the stack trace, in a production system, you should either use logging or something else to recover from errors.
- 7.
Create a singleton MessageRepository and ObjectMapper for converting ChatMessages to and from JSON.
We also update the “createWebsocketRoute”
method in WebApp to use our new Flow:
return path("chatws", () ->
handleWebSocketMessages(chatServer.flow())
);
The Web Client
For the end user to use our
WebSocket, we’ve got to have some kind of front end. For this purpose, we create an “index.html” file under “src/main/resources/akkahttp” with the following content:
<!DOCTYPE html>
<html>
<head>
<title>Hello Akka HTTP!</title>
<script>
var webSocket =
new WebSocket("ws://localhost:5010/chatws"); //1
function submitChat() {
var msg = { // 2
username: document.getElementById("u").value,
message: document.getElementById("m").value
};
webSocket.send(JSON.stringify(msg)); //3
document.getElementById("m").value = ""; //4
}
webSocket.onmessage = function (event) { //5
console.log(event.data);
var content = document.getElementById("content");
content.innerHTML = content.innerHTML
+ '<br>' + event.data;
}
</script>
</head>
<body>
<form> <!--6-->
Username:<input type="text" id="u"
name="username"><br>
Message: <input type="text" id="m"
name="message"><br>
<input type="button" value="Submit"
onclick="submitChat()">
</form>
<div id="content"></div>
</body>
</html>
- 1.
Create the WebSocket connection.
- 2.
Within our “submitChat” function, construct an object named “msg” with a username and message.
- 3.
Send the msg object as a JSON-formatted string.
- 4.
Blank the message input element to communicate to the user that the message was sent and allow a new one to be entered.
- 5.
Define the onmessage event handler of the WebSocket that will append chat messages to the page.
- 6.
Finally, we create the form for the user’s input.
Although this is a very simple interface, it is merely to demonstrate the powerful back end. With this simple chat server, we could handle thousands of users at one time.
In a real application, you would improve the interface and add error handling and other features like search, chat rooms, and security.
We also need to update the route to serve this file. Update the
createHelloRoute method with the following:
final Source<String,NotUsed> file =
Source.single("/akkahttp/index.html");
return route(
path("hello", () ->
get(() ->
complete(
HttpEntities
.create(ContentTypes.TEXT_HTML_UTF8,
file.map(f ->
WebApp.class.getResourceAsStream(f)) //1
.map(stream -> stream.readAllBytes()) //2
.map(bytes -> ByteString.fromArray(bytes))))//3
)));
- 1.
Read the file from the classpath using getResourceAsStream.
- 2.
Read all of the bytes from the file using Java’s InputStream’s readAllBytes method.
- 3.
Convert the byte array into a ByteString for Akka HTTP.
You can test out the application by running WebApp and visiting “http://localhost:5010/hello” in several browsers.
Testing
In addition to our standard Akka HTTP and Akka
Streams imports, we add the following imports:
akka.testkit.javadsl.TestKit;
akka.util.ByteString;
com.github.adamldavis.akkahttp.*;
org.junit.*;
java.util.*;
java.util.concurrent.*;
static org.assertj.core.api.Assertions.assertThat;
The core of our ChatServerTest class is the following setup and teardown:
ChatServer chatServer;
ActorSystem actorSystem;
ActorMaterializer materializer;
@Before
public void setup() {
actorSystem = ActorSystem.create("test-system"); //1
chatServer = new ChatServer(actorSystem);//2
materializer = ActorMaterializer.create(actorSystem);//3
}
@After
public void tearDown() {
TestKit.shutdownActorSystem(actorSystem);//4
}
- 1.
Before each test we do the following: Create the ActorSystem.
- 2.
- 3.
Create a ActorMaterializer that we will use for tests.
- 4.
After each test we use the Akka TestKit to shut down the TestKit ActorSystem.
Then we define a test like the following test that simply ensures that a ChatMessage gets copied to the Flow’s output as a TextMessage encoded in JSON:
@Test
public void flow_should_copy_messages() throws ExecutionException, InterruptedException {
final Collection<Message> list = new
ConcurrentLinkedDeque<>(); //1
Flow<Message, Message, NotUsed> flow = chatServer.flow(); //2
assertThat(flow).isNotNull();
List<Message> messages =
Arrays.asList(TextMessage.create(jsonMsg(0))); //3
Graph<SourceShape<Message>, ?> testSource =
Source.from(messages);
Graph<SinkShape<Message>, CompletionStage<Done>>
testSink = Sink.foreach(list::add); //4
CompletionStage<Done> results = flow.runWith(testSource,
testSink, materializer).second(); //5
try {
results.toCompletableFuture().get(2, TimeUnit.SECONDS); //6
} catch (TimeoutException te) {
System.out.println("caught expected: " +
te.getMessage());
}
Iterator<Message> iterator = list.iterator();
assertThat(list.size()).isEqualTo(1);
assertThat(iterator.next()
.asTextMessage().getStrictText())
.isEqualTo("{\"username\":\"foo\",”+
“\"message\":\"bar0\"}"); //7
}
static final String jsonMsg(int i) {
return "{\"username\": \"foo\", \"message\": \"bar"
+ i + "\"}";
}
- 1.
Create a ConcurrentLinkedDeque (named list) to save the messages to avoid any multithreading issues (this might be overkill).
- 2.
Call flow() to get the WebSocket Flow we want to test.
- 3.
Create a single TextMessage with a JSON-encoded chat message. Although we just create one here, in other tests we could create many using Source.range and then map like the following: Source.range(1, 100).map(i -> TextMessage.create(jsonMsg(i))).
- 4.
Create the testSink which adds each message to our previously defined list.
- 5.
Call flow.runWith with a source, sink, and materializer. This is where the Flow under test is initiated.
- 6.
We must call toCompletableFuture().get on our CompletionStage with a timeout in order to reconnect the current Thread with the test results. Otherwise, it would keep running forever since the underlying Publisher (backed by MergeHub and Sink.asPublisher) has no defined stopping point.
- 7.
Assert that the output TextMessage is encoded to JSON as expected.
The full code on GitHub has many more tests, but this should give you a good idea of how to test an Akka HTTP-based project.