As multicore processors become more and more standard, different models of concurrent programming have become more popular in Java. Although the core model of concurrency in Java is the Thread, multiple levels of abstraction have been built to enable simpler development.
Each of these models has a different approach toward protecting values from being modified by more than one thread at one time as we will cover in this chapter.
Prominent Models for Concurrency
Synchronize and suffer (using synchronize keyword in Java)
Futures and the ExecutorService
Software transactional memory (STM) (Clojure)
Actor-based model (Akka)
Reactive Streams (RxJava, Reactor, etc.)
Synchronize in Java
No warnings or errors are given at compile time.
Deadlocks can occur if you’re not careful.
It’s very difficult to make sure you’ve done everything right, and errors can occur randomly.
In conclusion, the synchronize keyword is too low level to use (just don’t use it!).1
Java Futures
You may have heard of the java.util.concurrent.Future interface in Java. Maybe you’ve even used it. This interface was added in Java 1.5, and it holds the result of an asynchronous computation. It contains methods to check if the asynchronous computation is complete or still in progress, to wait for the completion of the computation, to block the call until the completion of the computation (with optional timeout), and to retrieve the result of the computation.
Drawbacks of the Future Interface
When using Java’s Future , we tend to loop on isDone(), which ties up the thread, or call get() which blocks the thread completely.
ExecutorService#submit(...) is used the most (which returns a Future with a get() method that returns null).
Generally when “going asynchronous”, we don’t care about the result, or we want to do something with the result (thus we want something like a continuation).
We need a callback – removes the need for polling (isDone) and blocking. (Guava’s ListenableFuture provides this.)
Asynchronous methods should always return void.
For these reasons, if you do any concurrent programming, you should use the CompletableFuture introduced in Java 8 (which is covered next), the Java 7 concurrency API (ForkJoinPool and ForkJoinTask), or another concurrency framework.
CompletableFuture
The CompletableFuture<T> implements the Future<T> interface as well as a CompletionStage<T> interface that fills in many of the deficiencies of Future<T>. These methods follow the functional style, allowing the developer to chain method calls rather than declaring a step-by-step process.
acceptEither(CompletionStage, Consumer): Executes the given consumer when either this stage (the current Future) or the given stage completes.
applyToEither(CompletionStage, Function): Similar to acceptEither but uses a Function to convert a value into another value.
exceptionally(Function): If the stage throws an exception, the given function is given the exception to process and return a value.
handle(BiFunction): Uses the given function to handle both the success and failure conditions and returns a value.
runAfterBoth(CompletionStage, Runnable): Runs the given Runnable after both this stage (the current Future) and the given stage complete.
runAfterEither(CompletionStage, Runnable): Similar to acceptEither except using a Runnable.
thenAccept(Consumer): Runs the given consumer after this stage (the current Future) completes normally. This is similar to “then” in Promise models of concurrency if you’re familiar with Promises.
thenAcceptBoth(CompletionStage, BiConsumer): Runs the given biconsumer with both outputs after both this stage (the current Future) and the given stage complete normally.
thenApply(Function): Transforms a value using the given function after the stage completes normally.
thenCombine(CompletionStage, BiFunction): Transforms two values using the given function after both stages complete normally.
thenRun(Runnable): Runs the given Runnable after this stage completes.
whenComplete(BiConsumer): Uses the given consumer to handle both the success and failure conditions.
Asynchronous versions of these methods are also available with “Async” added to the method name. For the “Async” versions, the standard execution model of the given Future will be used instead of the current Thread.
CompletableFuture completedFuture(value): Returns a new CompletableFuture that is already completed with the given value.
CompletableFuture runAsync(Runnable): Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool().
CompletableFuture runAsync(Runnable, Executor): Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.
CompletableFuture supplyAsync(Supplier): Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
For more details, please see the documentation .
STM in Clojure
Java doesn’t have great support for concurrency built-in. Other languages for the JVM (Java virtual machine), like Scala and Clojure, have been built from the ground up with concurrency in mind. However, we can use the concurrency models from Scala and Clojure straight in Java.
Now you will get an error if you try to modify the Ref outside of a transaction. This makes concurrent programming easier because modifying data outside of a synchronized block is impossible.
Actors
The Scala-based actor framework Akka can also be used from Java. Akka is also used by the Play Framework . It includes the concept of Actors. Actors can receive and process messages and are guaranteed to receive messages sent to them. They process each message one at a time so their state is shielded from the rest of the system.
An Actor conceptually runs in a dedicated thread, so it can only do one thing at a time. This makes concurrency much easier to implement. Messages are passed around to Actors and wait in a queue until the given Actor is ready to process it. A message can be any Serializable object.
Groovy GPars
It’s worth noting that the Actor and STM concurrency patterns are not limited to Scala and Clojure.
Groovy’s GPars library implements these patterns as well and is also usable from Java. It also has Domain Specific Languages (DSLs) that wrap the JSR-166 features of Java, such as the Fork-Join framework, making them easier to use.
You can use GPars to do filter, map, and reduce an array in the following way:
In this example, Student is a class with a graduationYear and gpa. This code finds the highest GPA for 2017. The static method GParsPool.withPool takes in a closure and augments any Collection with several methods (using Groovy’s category mechanism). The parallel method actually creates a ParallelArray (JSR-166) from the given Collection and uses it with a thin wrapper around it.
Reactive Streams
Reactive Streams provide an abstraction for highly concurrent, asynchronous applications with support for backpressure.
A publisher emits events at some rate.
A subscriber observes those events on possibly a different thread and does something with them.
Some frameworks use other words (such as Source and Sink) to mean the same thing as publisher and subscriber.
As we will see, many Reactive Streams frameworks allow interoperation with other existing models of concurrency, such as futures, to allow a smooth transition between the two.