Fortunately, libuv provides us with exactly what we need to implement this sort of ExecutionContext in the form of the prepare_t handle. Unlike the other handles we’ve seen, such as the Timer and Socket handles, the prepare handle is fired on every iteration of the event loop, immediately before I/O actions are performed, and before any long sleeping/waiting actions. It just takes a few simple functions to set up:
| type PrepareHandle = Ptr[Byte] |
| type TimerHandle = Ptr[Byte] |
| type PrepareCB = CFuncPtr1[PrepareHandle, Unit] |
| type TimerCB = CFuncPtr1[TimerHandle,Unit] |
| |
| def uv_prepare_init(loop:Loop, handle:PrepareHandle):Int = extern |
| def uv_prepare_start(handle:PrepareHandle, cb: PrepareCB):Int = extern |
| def uv_prepare_stop(handle:PrepareHandle):Unit = extern |
So if we want to use the PrepareHandle to implement an ExecutionContext, we just need to create a queue, and then write a PrepareCB function that pulls work from the queue every time it’s invoked, like this:
| object EventLoop extends ExecutionContextExecutor { |
| val loop = uv_default_loop() |
| private val taskQueue = ListBuffer[Runnable]() |
| private val handle = stdlib.malloc(uv_handle_size(UV_PREPARE_T)) |
| check(uv_prepare_init(loop, handle), "uv_prepare_init") |
| |
| val prepareCallback = new PrepareCB { |
| def apply(handle:PrepareHandle) = { |
| while (taskQueue.nonEmpty) { |
| val runnable = taskQueue.remove(0) |
| try { |
| runnable.run() |
| } catch { |
| case t: Throwable => reportFailure(t) |
| } |
| } |
| if (taskQueue.isEmpty) { |
| println("stopping dispatcher") |
| uv_prepare_stop(handle) |
| } |
| } |
| } |
| |
| def execute(runnable: Runnable): Unit = { |
| taskQueue += runnable |
| check(uv_prepare_start(handle, prepareCallback), "uv_prepare_start") |
| } |
| |
| def reportFailure(t: Throwable): Unit = { |
| println(s"Future failed with Throwable $t:") |
| t.printStackTrace() |
| } |
| |
| def run(mode:Int = UV_RUN_DEFAULT):Unit = { |
| var continue = 1 |
| while (continue != 0) { |
| continue = uv_run(loop, mode) |
| println(s"uv_run returned $continue") |
| } |
| } |
| |
| private val bootstrapFuture = Future(run())(ExecutionContext.global) |
| } |
Overall, that’s less than 50 lines of code for a custom asynchronous scheduler! However, a few subtleties are worth calling attention to, because it’s very important to ensure that this code does the following:
We’re assisted in this by libuv itself; the default behavior of uv_run is to complete when there are no more active handles and requests, which is exactly what we want. All we have to do is ensure that we manually stop and start the prepare handle as needed. The trick we can use here is calling uv_prepare_start() from the execute() method, which ensures that the loop is always running if there’s work that it can do, even if it means we “start” the loop many times, harmlessly.
We also want to take care to ensure that our loop runs in the first place with uv_run; in our server program we invoked it manually, but here we instead use the bootstrapFuture to tell the built-in Scala Native ExecutionContext to immediately run our event loop, as soon as the main() function completes.
This is all much easier to follow if we run a very simple asynchronous program:
| def main(args:Array[String]):Unit = { |
| println("hello") |
| implicit val loop = EventLoop |
| println("setting up futures") |
| Future { |
| println("Future 1!") |
| }.map { _ => |
| println("Future 2!") |
| } |
| println("main about to return...") |
| } |
It produces output like this:
| hello |
| uv_prepare_init returned 0 |
| setting up futures |
| uv_prepare_start returned 0 |
| main about to return... |
| Future 1! |
| uv_prepare_start returned 0 |
| Future 2! |
| uv_prepare_stop returned 0 |
| uv_run returned 0 |
Now, we’ve ensured that our ExecutionContext is working correctly; however, we’re still missing one piece of the puzzle. In the last example, we used Future(0) to create a future with a precomputed value and verified that we could transform it in the ways we would expect. However, how do we create a Future that we can return immediately when we have not computed its value, such as a request that has not yet returned? For that, we’ll need to use Future’s lesser-known helper, Promise.