Asynchronous curl

Despite its power, the libcurl code we’ve written so far still has some limitations. Most important, whenever we run doRequestSync, our program blocks until the HTTP request is completed, which prevents other work from happening in the meantime. This is just fine for a simple command-line program, but it prevents us from integrating it with our asynchronous server, which needs to ensure that the event loop never blocks. Likewise, if we want to provide an asynchronous API with futures, we’ll also need some way to ensure that our program can continue while requests are in flight.

For the remainder of this chapter, we’ll build a solution to both of these problems by fully integrating libcurl with the libuv event loop. Rather than driving libcurl with a blocking easy_perform call, we’ll instead use additional callbacks to allow libuv to determine when to transfer data and complete requests, much like it did with our delay function.

This deep integration of the two libraries isn’t trivial, but it’s a well-documented and well-supported path. libcurl, in particular, provides a variety of API supports for integrating with external event loops.

Integrating with libuv

libcurl’s extensive documentation contrasts the easy API we’ve looked at so far with the multi API. At its core, the multi API revolves around another opaque pointer—called a multi handle in C—as well as the curl_multi_add function that associates an easy handle with a multi handle. We’ll model them in Scala like so, and call the multi handle MultiCurl for clarity:

LibUVFutures/curl_async/curl.scala
 type​ ​MultiCurl​ = Ptr[​Byte​]
 
 @name(​"curl_multi_init"​)
 def​ multi_init()​:​​MultiCurl​ = extern
 
 @name(​"curl_multi_add_handle"​)
 def​ multi_add_handle(multi​:​​MultiCurl​, easy​:​​Curl​)​:​​Int​ = extern
 
 @name(​"curl_multi_setopt"​)
 def​ curl_multi_setopt(multi​:​​MultiCurl​, option​:​​CInt​,
  parameter​:​​CVarArg​)​:​ ​CInt​ = extern
 
 @name(​"curl_multi_setopt"​)
 def​ multi_setopt_ptr(multi​:​​MultiCurl​, option​:​​CInt​,
  parameter​:​​Ptr​[​Byte​])​:​ ​CInt​ = extern
 
 @name(​"curl_multi_assign"​)
 def​ multi_assign(
  multi​:​​MultiCurl​,
  socket​:​​Ptr​[​Byte​],
  socket_data​:​​Ptr​[​Byte​])​:​​Int​ = extern
 
 @name(​"curl_multi_socket_action"​)
 def​ multi_socket_action(
  multi​:​​MultiCurl​,
  socket​:​​Ptr​[​Byte​],
  events​:​​Int​,
  numhandles​:​​Ptr​[​Int​])​:​​Int​ = extern
 
 @name(​"curl_multi_info_read"​)
 def​ multi_info_read(multi​:​​MultiCurl​,
  message​:​​Ptr​[​Int​])​:​ ​Ptr​[​CurlMessage​] ​=​ extern
 
 @name(​"curl_multi_perform"​)
 def​ multi_perform(multi​:​​MultiCurl​, numhandles​:​​Ptr​[​Int​])​:​​Int​ = extern
 
 @name(​"curl_multi_cleanup"​)
 def​ multi_cleanup(multi​:​​MultiCurl​)​:​​Int​ = extern

Once we have a multi handle, there are many different ways libcurl allows us to use it to perform multiple requests at once. We can use techniques for blocking on all of the requests at once or for poll/select-style handling, either of which would be appropriate for smaller numbers of requests or if we weren’t interested in interleaving requests with other processes in a server program. But for truly fine-grained control, the multi API provides an additional set of callbacks on the multi handle itself, which can be used to control and coordinate interactions with an external event loop, such as libuv. Correct implementation of this API can be tricky, but with a little bit of planning and preparation, you’ll see that it doesn’t require that much more code than what we’ve already written.

On the libcurl side, we’ll only need a few new type signatures and functions:

LibUVFutures/curl_async/curl.scala
 val​ SOCKETFUNCTION ​=​ 20001
 type​ ​SocketCallback​ = CFuncPtr5[​Curl​, ​Ptr​[​Byte​], ​CInt​, ​Ptr​[​Byte​], ​Ptr​[​Byte​],
 CInt​]
 val​ TIMERFUNCTION ​=​ 20004
 type​ ​TimerCallback​ = CFuncPtr3[​MultiCurl​, ​Long​, ​Ptr​[​Byte​], ​CInt​]
 
 type​ ​CurlAction​ = CInt
 val​ POLL_NONE​:​​CurlAction​ = 0
 val​ POLL_IN​:​​CurlAction​ = 1
 val​ POLL_OUT​:​​CurlAction​ = 2
 val​ POLL_INOUT​:​​CurlAction​ = 3
 val​ POLL_REMOVE​:​​CurlAction​ = 4

Likewise, on the libuv side, we’ll need to use a new handle type, the poll handle, which can provide notifications of the readiness of an externally managed socket:

LibUVFutures/curl_async/loop.scala
 def​ uv_poll_init_socket(loop​:​​Loop​, handle​:​​PollHandle​, socket​:​​Ptr​[​Byte​])​:
 Int​ = extern
 def​ uv_poll_start(handle​:​​PollHandle​, events​:​​Int​, cb​:​ ​PollCB​)​:​​Int​ = extern
 def​ uv_poll_stop(handle​:​​PollHandle​)​:​​Int​ = extern

Rather than try to explain each of these components on their own, we can get a better sense of the big picture by first considering how libcurl and libuv will interact:

  1. curl creates one or more sockets.

  2. curl notifies libuv that it has created new sockets.

  3. libuv starts watching the sockets.

  4. libuv sees that a socket is readable or writable and notifies curl.

  5. curl performs the appropriate transfer on the ready socket(s).

  6. curl checks to see if the request is complete.

  7. If the request is complete, curl completes the request and closes the socket.

  8. When there are no more requests in flight, the loop terminates.

An interesting property of this algorithm is that the only time libcurl initiates an action is on request creation. All data transfer is initiated by libuv, which is watching the sockets libcurl creates. libcurl supports this workflow with the multi_socket_action function, which allows libuv code to briefly hand over the reins for curl’s own callbacks:

 @name("curl_multi_socket_action")
  def multi_socket_action(
  multi:MultiCurl,
  socket:Ptr[Byte],
  events:Int,
  numhandles:Ptr[Int]):Int = extern

multi_socket_action behaves differently based on the arguments it receives. If it’s called with a running socket in its socket argument, it will work only on that socket, whereas if it’s called with the constant SOCKET_TIMEOUT, it will check every socket for changes in state, including administrative functions such as timeouts. multi_socket_action also works closely with the two new callbacks that we’ll define on our MultiCurl handle: the SOCKETFUNCTION and TIMERFUNCTION callback. Together, these two callback functions give libcurl “feedback” into libuv’s control structures by creating and modifying new handles on the event loop. In particular, our TIMERFUNCTION, which we’ll call set_timer, allows libcurl to create and adjust the delay on a libuv Timer handle, which will invoke multi_socket_action when it fires. Likewise, our SOCKETFUNCTION callback instructs libuv when to create a Poll handle for a new socket, and how to adjust.

All together, the whole network of callbacks looks like this:

images/multi_curl.png

While I was researching this pattern, however, I was still puzzled by one thing: there isn’t an obvious way to start the whole system! After a fair amount of experimentation, I discovered the answer: once the TIMERFUNCTION is set, it will be invoked not only during multi_socket_action but also during multi_add_handle! This makes bootstrapping much easier, but it does have some impact on the order in which we initialize things, so we’ll need to be careful.

With this overall design in place, we can start implementing the individual callbacks one at a time.

Implementing the Callbacks

Let’s start by implementing start_timer, which will be passed to libcurl as a TIMERFUNCTION, and control a libuv TimerHandle:

LibUVFutures/curl_async/curl.scala
 val​ startTimerCB ​=​ ​new​ CurlTimerCallback {
 def​ apply(curl​:​​MultiCurl​, timeout_ms​:​​Long​, data​:​​Ptr​[​Byte​])​:​​Int​ = {
  println(s​"start_timer called with timeout ${timeout_ms} ms"​)
 val​ time ​=​ ​if​ (timeout_ms < 1) {
  println(​"setting effective timeout to 1"​)
  1
  } ​else​ timeout_ms
  println(​"starting timer"​)
  check(uv_timer_start(timerHandle, timeoutCB, time, 0), ​"uv_timer_start"​)
  cleanup_requests()
  0
  }
 }

Essentially, libcurl uses set_timeout() in two different ways: either to tell libuv to call multi_socket_action right away, in which case we’ll pass on a timeout of 1ms, or at some point in the future. When the timeout expires, libuv will invoke another callback, which is relatively simple because all it has to do is invoke multi_socket_action as instructed:

LibUVFutures/curl_async/curl.scala
 val​ timeoutCB ​=​ ​new​ TimerCB {
 def​ apply(handle​:​​TimerHandle​)​:​​Unit​ = {
  println(​"in timeout callback"​)
 val​ running_handles ​=​ stackalloc[​Int​]
  multi_socket_action(multi,int_to_ptr(-1),0,running_handles)
  println(s​"on_timer fired, ${!running_handles} sockets running"​)
  }
 }

In contrast, the SOCKETFUNCTION callback will be more complex because it has to handle a variety of different cases:

The good news is that setting up the local state of the socket is mostly simple; we can use the same RequestData struct that we used for our blocking implementation. There’s one catch, though. We’ll need to use a helper function called multi_socket_assign to associate that struct with a particular socket within the larger MultiCurl.

 @name("curl_multi_assign")
 def multi_assign(
  multi:MultiCurl,
  socket:Ptr[Byte],
  socket_data:Ptr[Byte]):Int = extern

Finally, we’ll also need to translate between libcurl’s flags for readability and writability to another set of flags that libuv can understand. Each library uses very similar encodings, but the binary logic is tricky to get right. All together, the handling code in the SOCKETFUNCTION callback looks like this:

LibUVFutures/curl_async/curl.scala
 val​ socketCB ​=​ ​new​ CurlSocketCallback {
 def​ apply(curl​:​​Curl​, socket​:​​Ptr​[​Byte​], action​:​​Int​, data​:​​Ptr​[​Byte​],
  socket_data​:​​Ptr​[​Byte​])​:​​Int​ = {
  println(s​"socketCB called with action ${action}"​)
 val​ pollHandle ​=​ ​if​ (socket_data == ​null​) {
  println(s​"initializing handle for socket ${socket}"​)
 val​ buf ​=​ malloc(uv_handle_size(UV_POLL_T)).asInstanceOf[​Ptr​[​Ptr​[​Byte​]]]
  !buf ​=​ socket
  check(uv_poll_init_socket(loop, buf, socket), ​"uv_poll_init_socket"​)
  check(multi_assign(multi, socket, buf.asInstanceOf[​Ptr​[​Byte​]]),
 "multi_assign"​)
  buf
  } ​else​ {
  socket_data.asInstanceOf[​Ptr​[​Ptr​[​Byte​]]]
  }
 
 val​ events ​=​ action ​match​ {
 case​ POLL_NONE ​=>​ None
 case​ POLL_IN ​=>​ Some(UV_READABLE)
 case​ POLL_OUT ​=>​ Some(UV_WRITABLE)
 case​ POLL_INOUT ​=>​ Some(UV_READABLE | UV_WRITABLE)
 case​ POLL_REMOVE ​=>​ None
  }
 
  events ​match​ {
 case​ Some(ev) ​=>
  println(s​"starting poll with events $ev"​)
  uv_poll_start(pollHandle, ev, pollCB)
 case​ None ​=>
  println(​"stopping poll"​)
  uv_poll_stop(pollHandle)
  startTimerCB(multi, 1, ​null​)
  }
  0
  }
 }

But the actual libuv PollHandle callback is, once again, refreshingly concise:

LibUVFutures/curl_async/curl.scala
 val​ pollCB ​=​ ​new​ PollCB {
 def​ apply(pollHandle​:​​PollHandle​, status​:​​Int​, events​:​​Int​)​:​​Unit​ = {
  println(s​"""ready_for_curl fired with status ${status} and
  events ${events}"""​)
 val​ socket ​=​ !(pollHandle.asInstanceOf[​Ptr​[​Ptr​[​Byte​]]])
 val​ actions ​=​ (events & 1) | (events & 2)
 val​ running_handles ​=​ stackalloc[​Int​]
 val​ result ​=​ multi_socket_action(multi, socket, actions, running_handles)
  println(​"multi_socket_action"​,result)
  }
 }

And now that on_poll is also calling back into multi_socket_action, we’ve completed the loop, and all of the control mechanisms are in place. Best of all, we don’t even need to write new implementations for on_header and on_body—we can use our previous ones unchanged, so this code is almost ready to run! We just need to implement a beginRequestAsync and completeRequestAsync method corresponding to those we wrote before.

Again, like with our delay() function, we’ll create a promise when our request starts and use a global Map[Int,Promise{Response]] to convert request ids back to promises. Creating the promises is relatively clean:

LibUVFutures/curl_async/curl.scala
 def​ startRequest(method​:​​Int​, url​:​​String​,headers​:​​Seq​[​String​] ​=
  Seq.empty,body​:​​String​ = ​""​)​:​​Future​[​ResponseState​] ​=​ Zone { ​implicit​ z ​=>
  init()
 val​ curlHandle ​=​ easy_init()
  serial += 1
 val​ reqId ​=​ serial
  println(s​"initializing handle $curlHandle for request $reqId"​)
 val​ req_id_ptr ​=​ malloc(sizeof[​Long​]).asInstanceOf[​Ptr​[​Long​]]
  !req_id_ptr ​=​ reqId
  requests(reqId) ​=​ ResponseState()
 val​ promise ​=​ Promise[​ResponseState​]()
  requestPromises(reqId) ​=​ promise
 
  method ​match​ {
 case​ GET ​=>
  check(curl_easy_setopt(curlHandle, URL, toCString(url)),
 "easy_setopt"​)
  check(curl_easy_setopt(curlHandle, WRITECALLBACK, func_to_ptr(dataCB)),
 "easy_setopt"​)
  check(curl_easy_setopt(curlHandle, WRITEDATA,
  req_id_ptr.asInstanceOf[​Ptr​[​Byte​]]), ​"easy_setopt"​)
  check(curl_easy_setopt(curlHandle, HEADERCALLBACK,
  func_to_ptr(headerCB)), ​"easy_setopt"​)
  check(curl_easy_setopt(curlHandle, HEADERDATA,
  req_id_ptr.asInstanceOf[​Ptr​[​Byte​]]), ​"easy_setopt"​)
  check(curl_easy_setopt(curlHandle, PRIVATEDATA,
  req_id_ptr.asInstanceOf[​Ptr​[​Byte​]]), ​"easy_setopt"​)
  }
  multi_add_handle(multi, curlHandle)
 
  println(​"request initialized"​)
  promise.future
 }

Completing them is a little trickier. We don’t get a callback that fires every time a request is done; instead, libcurl puts messages in a queue that we can handle with multi_info_read. We’ll also need to use easy_getinfo with GET_PRIVATEDATA to get a pointer back to the response data we’ve been tracking all this time, which will allow us to construct a Response object and complete the Future successfully:

LibUVFutures/curl_async/curl.scala
 def​ cleanup_requests()​:​​Unit​ = {
 val​ messages ​=​ stackalloc[​Int​]
 val​ privateDataPtr​=​ stackalloc[​Ptr​[​Long​]]
 var​ message​:​​Ptr​[​CurlMessage​] ​=​ multi_info_read(multi,messages)
 while​ (message != ​null​) {
  println(s​"""Got a message ${message._1} from multi_info_read,
  ${!messages} left in queue"""​)
 val​ handle​:​​Curl​ = message._2
  check(easy_getinfo(handle, GET_PRIVATEDATA,
  privateDataPtr.asInstanceOf[​Ptr​[​Byte​]]),​"getinfo"​)
 val​ privateData ​=​ !privateDataPtr
 val​ reqId ​=​ !privateData
 val​ reqData ​=​ requests.remove(reqId).get
 val​ promise ​=​ Curl.requestPromises.remove(reqId).get
  promise.success(reqData)
  message ​=​ multi_info_read(multi,messages)
  }
  println(​"done handling messages"​)
 }

With these methods in place, we can then adapt our command-line utility from before to make asynchronous requests, on a single libuv event loop:

LibUVFutures/curl_async/main.scala
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
 if​ (args.length == 0) {
  println(​"usage: ./curl-out https://www.example.com"​)
  ???
  }
 
  println(​"initializing loop"​)
 implicit​ ​val​ loop ​=​ EventLoop
 val​ resp ​=​ Zone { ​implicit​ z ​=>
 for​ (arg ​<-​ args) {
 val​ url ​=​ arg
 val​ resp ​=​ Curl.startRequest(GET,url)
 
  resp.onComplete {
 case​ Success(data) ​=>
  println(s​"got response for ${arg} - length ${data.body.size}"​)
  println(s​"headers:"​)
 for​ (h ​<-​ data.headers) {
  println(s​"request header: $h"​)
  }
  println(s​"body: ${data.body}"​)
 case​ Failure(f) ​=>
  println(​"request failed"​,f)
  }
  }
  }
 
  loop.run()
  println(​"done"​)
 }

When run, it should behave like this:

 $ ./target/scala-2.11/curl_async-out https://www.example.com
 initializing loop
 uv_prepare_init returned 0
 ...
 got back response for https://www.example.com - body of length 1270
 headers:
 request header: (X-Cache,HIT)
 request header: (Vary,Accept-Encoding)
 ...
 body: <!doctype html>
 <html>
 <head>
  <title>Example Domain</title>
 ...
 stopping dispatcher
 on_timer fired, 0 sockets running
 uv_run returned 0
 loop done, cleaning up
 cleaning up internals
 done handling messages
 done

This does, essentially, the same work as the blocking version, except it can perform all the requests at the same time. If we time it, we’ll see that it can perform many requests at once without impacting its performance and vastly improve on the speed of the blocking version.