Implementing Futures

In Scala, most asynchronous functions take an implicit parameter called ExecutionContext. ExecutionContext is a trait implemented by anything that can “run” futures.

Regular JVM Scala provides a global ExecutionContext backed by a thread pool, but there are many other implementations available, both in the standard library as well as in libraries like Akka[38] and Finagle,[39] and as you’ll see shortly, you can even write your own. So how does it work?

Every asynchronous action we can define on a future needs an ExecutionContext as an implicit parameter. For example, when we invoke future.map(action)(ec), we are, in effect, registering that ec should perform action when future is complete; however, almost all of this logic is in the implementation of Future itself. The signature of the ExecutionContext, in comparison, is trivial:

LibUVFutures/ec.scala
 trait​ ExecutionContext {
 
 /** Runs a block of code on this execution context. */
 def​ execute(runnable​:​ ​Runnable​)​:​ ​Unit
 
 /** Reports that an asynchronous computation failed. */
 def​ reportFailure(t​:​ ​Throwable​)​:​ ​Unit
 
 }

This minimal interface is the key to Scala’s flexible, modular concurrency support, and we’ll make use of it to provide futures from our event loop. Although most Scala ExecutionContexts use some kind of thread pool for background processing, this isn’t required; in fact, Scala Native has a built-in single-threaded ExecutionContext implementation that we can use for a model. Its major limitation is that it only starts processing futures once the main function of a program completes. This makes it unsuitable for working with libuv, but if we take a look at the code, there’s a lot we can adapt:

LibUVFutures/ec.scala
 object​ ExecutionContext {
 def​ global​:​ ​ExecutionContextExecutor​ = QueueExecutionContext
 
 private​ ​object​ QueueExecutionContext ​extends​ ExecutionContextExecutor {
 def​ execute(runnable​:​ ​Runnable​)​:​ ​Unit​ = queue += runnable
 def​ reportFailure(t​:​ ​Throwable​)​:​ ​Unit​ = t.printStackTrace()
  }
 
 private​ ​val​ queue​:​ ​ListBuffer​[​Runnable​] ​=​ ​new​ ListBuffer
 
 private​ ​def​ loop()​:​ ​Unit​ = {
 while​ (queue.nonEmpty) {
 val​ runnable ​=​ queue.remove(0)
 try​ {
  runnable.run()
  } ​catch​ {
 case​ t​:​ ​Throwable​ =>
  QueueExecutionContext.reportFailure(t)
  }
  }
  }
 }

Essentially, this code keeps a queue containing Runnables for later execution. While the loop is processing, it continuously pulls items from the head of the queue until the queue is exhausted and there’s no more work to perform. This is definitely a pattern we can adapt to our codebase; instead of running at the very end of our program, we’ll just need some way to pull work from our queue as soon as it’s available, but without interrupting or blocking during other operations.