Implementing an ExecutionContext

Fortunately, libuv provides us with exactly what we need to implement this sort of ExecutionContext in the form of the prepare_t handle. Unlike the other handles we’ve seen, such as the Timer and Socket handles, the prepare handle is fired on every iteration of the event loop, immediately before I/O actions are performed, and before any long sleeping/waiting actions. It just takes a few simple functions to set up:

LibUVFutures/simple_async/loop.scala
 type​ ​PrepareHandle​ = Ptr[​Byte​]
 type​ ​TimerHandle​ = Ptr[​Byte​]
 type​ ​PrepareCB​ = CFuncPtr1[​PrepareHandle​, ​Unit​]
 type​ ​TimerCB​ = CFuncPtr1[​TimerHandle​,​Unit​]
 
 def​ uv_prepare_init(loop​:​​Loop​, handle​:​​PrepareHandle​)​:​​Int​ = extern
 def​ uv_prepare_start(handle​:​​PrepareHandle​, cb​:​ ​PrepareCB​)​:​​Int​ = extern
 def​ uv_prepare_stop(handle​:​​PrepareHandle​)​:​​Unit​ = extern

So if we want to use the PrepareHandle to implement an ExecutionContext, we just need to create a queue, and then write a PrepareCB function that pulls work from the queue every time it’s invoked, like this:

LibUVFutures/simple_async/loop.scala
 object​ EventLoop ​extends​ ExecutionContextExecutor {
 val​ loop ​=​ uv_default_loop()
 private​ ​val​ taskQueue ​=​ ListBuffer[​Runnable​]()
 private​ ​val​ handle ​=​ stdlib.malloc(uv_handle_size(UV_PREPARE_T))
  check(uv_prepare_init(loop, handle), ​"uv_prepare_init"​)
 
 val​ prepareCallback ​=​ ​new​ PrepareCB {
 def​ apply(handle​:​​PrepareHandle​) ​=​ {
 while​ (taskQueue.nonEmpty) {
 val​ runnable ​=​ taskQueue.remove(0)
 try​ {
  runnable.run()
  } ​catch​ {
 case​ t​:​ ​Throwable​ => reportFailure(t)
  }
  }
 if​ (taskQueue.isEmpty) {
  println(​"stopping dispatcher"​)
  uv_prepare_stop(handle)
  }
  }
  }
 
 def​ execute(runnable​:​ ​Runnable​)​:​ ​Unit​ = {
  taskQueue += runnable
  check(uv_prepare_start(handle, prepareCallback), ​"uv_prepare_start"​)
  }
 
 def​ reportFailure(t​:​ ​Throwable​)​:​ ​Unit​ = {
  println(s​"Future failed with Throwable $t:"​)
  t.printStackTrace()
  }
 
 def​ run(mode​:​​Int​ = UV_RUN_DEFAULT)​:​​Unit​ = {
 var​ continue ​=​ 1
 while​ (continue != 0) {
  continue ​=​ uv_run(loop, mode)
  println(s​"uv_run returned $continue"​)
  }
  }
 
 private​ ​val​ bootstrapFuture ​=​ Future(run())(ExecutionContext.global)
 }

Overall, that’s less than 50 lines of code for a custom asynchronous scheduler! However, a few subtleties are worth calling attention to, because it’s very important to ensure that this code does the following:

  1. Runs every future that it can.
  2. Completes and allows the program to exit when there’s no more work.

We’re assisted in this by libuv itself; the default behavior of uv_run is to complete when there are no more active handles and requests, which is exactly what we want. All we have to do is ensure that we manually stop and start the prepare handle as needed. The trick we can use here is calling uv_prepare_start() from the execute() method, which ensures that the loop is always running if there’s work that it can do, even if it means we “start” the loop many times, harmlessly.

We also want to take care to ensure that our loop runs in the first place with uv_run; in our server program we invoked it manually, but here we instead use the bootstrapFuture to tell the built-in Scala Native ExecutionContext to immediately run our event loop, as soon as the main() function completes.

This is all much easier to follow if we run a very simple asynchronous program:

LibUVFutures/simple_async/main.scala
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
  println(​"hello"​)
 implicit​ ​val​ loop ​=​ EventLoop
  println(​"setting up futures"​)
  Future {
  println(​"Future 1!"​)
  }.map { ​_​ ​=>
  println(​"Future 2!"​)
  }
  println(​"main about to return..."​)
 }

It produces output like this:

 hello
 uv_prepare_init returned 0
 setting up futures
 uv_prepare_start returned 0
 main about to return...
 Future 1!
 uv_prepare_start returned 0
 Future 2!
 uv_prepare_stop returned 0
 uv_run returned 0

Now, we’ve ensured that our ExecutionContext is working correctly; however, we’re still missing one piece of the puzzle. In the last example, we used Future(0) to create a future with a precomputed value and verified that we could transform it in the ways we would expect. However, how do we create a Future that we can return immediately when we have not computed its value, such as a request that has not yet returned? For that, we’ll need to use Future’s lesser-known helper, Promise.