Despite its power, the libcurl code we’ve written so far still has some limitations. Most important, whenever we run doRequestSync, our program blocks until the HTTP request is completed, which prevents other work from happening in the meantime. This is just fine for a simple command-line program, but it prevents us from integrating it with our asynchronous server, which needs to ensure that the event loop never blocks. Likewise, if we want to provide an asynchronous API with futures, we’ll also need some way to ensure that our program can continue while requests are in flight.
For the remainder of this chapter, we’ll build a solution to both of these problems by fully integrating libcurl with the libuv event loop. Rather than driving libcurl with a blocking easy_perform call, we’ll instead use additional callbacks to allow libuv to determine when to transfer data and complete requests, much like it did with our delay function.
This deep integration of the two libraries isn’t trivial, but it’s a well-documented and well-supported path. libcurl, in particular, provides a variety of API supports for integrating with external event loops.
libcurl’s extensive documentation contrasts the easy API we’ve looked at so far with the multi API. At its core, the multi API revolves around another opaque pointer—called a multi handle in C—as well as the curl_multi_add function that associates an easy handle with a multi handle. We’ll model them in Scala like so, and call the multi handle MultiCurl for clarity:
| type MultiCurl = Ptr[Byte] |
| |
| @name("curl_multi_init") |
| def multi_init():MultiCurl = extern |
| |
| @name("curl_multi_add_handle") |
| def multi_add_handle(multi:MultiCurl, easy:Curl):Int = extern |
| |
| @name("curl_multi_setopt") |
| def curl_multi_setopt(multi:MultiCurl, option:CInt, |
| parameter:CVarArg): CInt = extern |
| |
| @name("curl_multi_setopt") |
| def multi_setopt_ptr(multi:MultiCurl, option:CInt, |
| parameter:Ptr[Byte]): CInt = extern |
| |
| @name("curl_multi_assign") |
| def multi_assign( |
| multi:MultiCurl, |
| socket:Ptr[Byte], |
| socket_data:Ptr[Byte]):Int = extern |
| |
| @name("curl_multi_socket_action") |
| def multi_socket_action( |
| multi:MultiCurl, |
| socket:Ptr[Byte], |
| events:Int, |
| numhandles:Ptr[Int]):Int = extern |
| |
| @name("curl_multi_info_read") |
| def multi_info_read(multi:MultiCurl, |
| message:Ptr[Int]): Ptr[CurlMessage] = extern |
| |
| @name("curl_multi_perform") |
| def multi_perform(multi:MultiCurl, numhandles:Ptr[Int]):Int = extern |
| |
| @name("curl_multi_cleanup") |
| def multi_cleanup(multi:MultiCurl):Int = extern |
Once we have a multi handle, there are many different ways libcurl allows us to use it to perform multiple requests at once. We can use techniques for blocking on all of the requests at once or for poll/select-style handling, either of which would be appropriate for smaller numbers of requests or if we weren’t interested in interleaving requests with other processes in a server program. But for truly fine-grained control, the multi API provides an additional set of callbacks on the multi handle itself, which can be used to control and coordinate interactions with an external event loop, such as libuv. Correct implementation of this API can be tricky, but with a little bit of planning and preparation, you’ll see that it doesn’t require that much more code than what we’ve already written.
On the libcurl side, we’ll only need a few new type signatures and functions:
| val SOCKETFUNCTION = 20001 |
| type SocketCallback = CFuncPtr5[Curl, Ptr[Byte], CInt, Ptr[Byte], Ptr[Byte], |
| CInt] |
| val TIMERFUNCTION = 20004 |
| type TimerCallback = CFuncPtr3[MultiCurl, Long, Ptr[Byte], CInt] |
| |
| type CurlAction = CInt |
| val POLL_NONE:CurlAction = 0 |
| val POLL_IN:CurlAction = 1 |
| val POLL_OUT:CurlAction = 2 |
| val POLL_INOUT:CurlAction = 3 |
| val POLL_REMOVE:CurlAction = 4 |
Likewise, on the libuv side, we’ll need to use a new handle type, the poll handle, which can provide notifications of the readiness of an externally managed socket:
| def uv_poll_init_socket(loop:Loop, handle:PollHandle, socket:Ptr[Byte]): |
| Int = extern |
| def uv_poll_start(handle:PollHandle, events:Int, cb: PollCB):Int = extern |
| def uv_poll_stop(handle:PollHandle):Int = extern |
Rather than try to explain each of these components on their own, we can get a better sense of the big picture by first considering how libcurl and libuv will interact:
curl creates one or more sockets.
curl notifies libuv that it has created new sockets.
libuv starts watching the sockets.
libuv sees that a socket is readable or writable and notifies curl.
curl performs the appropriate transfer on the ready socket(s).
curl checks to see if the request is complete.
If the request is complete, curl completes the request and closes the socket.
When there are no more requests in flight, the loop terminates.
An interesting property of this algorithm is that the only time libcurl initiates an action is on request creation. All data transfer is initiated by libuv, which is watching the sockets libcurl creates. libcurl supports this workflow with the multi_socket_action function, which allows libuv code to briefly hand over the reins for curl’s own callbacks:
| @name("curl_multi_socket_action") |
| def multi_socket_action( |
| multi:MultiCurl, |
| socket:Ptr[Byte], |
| events:Int, |
| numhandles:Ptr[Int]):Int = extern |
multi_socket_action behaves differently based on the arguments it receives. If it’s called with a running socket in its socket argument, it will work only on that socket, whereas if it’s called with the constant SOCKET_TIMEOUT, it will check every socket for changes in state, including administrative functions such as timeouts. multi_socket_action also works closely with the two new callbacks that we’ll define on our MultiCurl handle: the SOCKETFUNCTION and TIMERFUNCTION callback. Together, these two callback functions give libcurl “feedback” into libuv’s control structures by creating and modifying new handles on the event loop. In particular, our TIMERFUNCTION, which we’ll call set_timer, allows libcurl to create and adjust the delay on a libuv Timer handle, which will invoke multi_socket_action when it fires. Likewise, our SOCKETFUNCTION callback instructs libuv when to create a Poll handle for a new socket, and how to adjust.
All together, the whole network of callbacks looks like this:
While I was researching this pattern, however, I was still puzzled by one thing: there isn’t an obvious way to start the whole system! After a fair amount of experimentation, I discovered the answer: once the TIMERFUNCTION is set, it will be invoked not only during multi_socket_action but also during multi_add_handle! This makes bootstrapping much easier, but it does have some impact on the order in which we initialize things, so we’ll need to be careful.
With this overall design in place, we can start implementing the individual callbacks one at a time.
Let’s start by implementing start_timer, which will be passed to libcurl as a TIMERFUNCTION, and control a libuv TimerHandle:
| val startTimerCB = new CurlTimerCallback { |
| def apply(curl:MultiCurl, timeout_ms:Long, data:Ptr[Byte]):Int = { |
| println(s"start_timer called with timeout ${timeout_ms} ms") |
| val time = if (timeout_ms < 1) { |
| println("setting effective timeout to 1") |
| 1 |
| } else timeout_ms |
| println("starting timer") |
| check(uv_timer_start(timerHandle, timeoutCB, time, 0), "uv_timer_start") |
| cleanup_requests() |
| 0 |
| } |
| } |
Essentially, libcurl uses set_timeout() in two different ways: either to tell libuv to call multi_socket_action right away, in which case we’ll pass on a timeout of 1ms, or at some point in the future. When the timeout expires, libuv will invoke another callback, which is relatively simple because all it has to do is invoke multi_socket_action as instructed:
| val timeoutCB = new TimerCB { |
| def apply(handle:TimerHandle):Unit = { |
| println("in timeout callback") |
| val running_handles = stackalloc[Int] |
| multi_socket_action(multi,int_to_ptr(-1),0,running_handles) |
| println(s"on_timer fired, ${!running_handles} sockets running") |
| } |
| } |
In contrast, the SOCKETFUNCTION callback will be more complex because it has to handle a variety of different cases:
A totally new socket has been created, and needs to be registered with libuv.
A socket has entered writable state, and we can start sending data to the remote server.
The request has been sent, and the socket has switched to readable state, awaiting a response.
The request has been completed, and the poll handle can be deactivated.
The good news is that setting up the local state of the socket is mostly simple; we can use the same RequestData struct that we used for our blocking implementation. There’s one catch, though. We’ll need to use a helper function called multi_socket_assign to associate that struct with a particular socket within the larger MultiCurl.
| @name("curl_multi_assign") |
| def multi_assign( |
| multi:MultiCurl, |
| socket:Ptr[Byte], |
| socket_data:Ptr[Byte]):Int = extern |
Finally, we’ll also need to translate between libcurl’s flags for readability and writability to another set of flags that libuv can understand. Each library uses very similar encodings, but the binary logic is tricky to get right. All together, the handling code in the SOCKETFUNCTION callback looks like this:
| val socketCB = new CurlSocketCallback { |
| def apply(curl:Curl, socket:Ptr[Byte], action:Int, data:Ptr[Byte], |
| socket_data:Ptr[Byte]):Int = { |
| println(s"socketCB called with action ${action}") |
| val pollHandle = if (socket_data == null) { |
| println(s"initializing handle for socket ${socket}") |
| val buf = malloc(uv_handle_size(UV_POLL_T)).asInstanceOf[Ptr[Ptr[Byte]]] |
| !buf = socket |
| check(uv_poll_init_socket(loop, buf, socket), "uv_poll_init_socket") |
| check(multi_assign(multi, socket, buf.asInstanceOf[Ptr[Byte]]), |
| "multi_assign") |
| buf |
| } else { |
| socket_data.asInstanceOf[Ptr[Ptr[Byte]]] |
| } |
| |
| val events = action match { |
| case POLL_NONE => None |
| case POLL_IN => Some(UV_READABLE) |
| case POLL_OUT => Some(UV_WRITABLE) |
| case POLL_INOUT => Some(UV_READABLE | UV_WRITABLE) |
| case POLL_REMOVE => None |
| } |
| |
| events match { |
| case Some(ev) => |
| println(s"starting poll with events $ev") |
| uv_poll_start(pollHandle, ev, pollCB) |
| case None => |
| println("stopping poll") |
| uv_poll_stop(pollHandle) |
| startTimerCB(multi, 1, null) |
| } |
| 0 |
| } |
| } |
But the actual libuv PollHandle callback is, once again, refreshingly concise:
| val pollCB = new PollCB { |
| def apply(pollHandle:PollHandle, status:Int, events:Int):Unit = { |
| println(s"""ready_for_curl fired with status ${status} and |
| events ${events}""") |
| val socket = !(pollHandle.asInstanceOf[Ptr[Ptr[Byte]]]) |
| val actions = (events & 1) | (events & 2) |
| val running_handles = stackalloc[Int] |
| val result = multi_socket_action(multi, socket, actions, running_handles) |
| println("multi_socket_action",result) |
| } |
| } |
And now that on_poll is also calling back into multi_socket_action, we’ve completed the loop, and all of the control mechanisms are in place. Best of all, we don’t even need to write new implementations for on_header and on_body—we can use our previous ones unchanged, so this code is almost ready to run! We just need to implement a beginRequestAsync and completeRequestAsync method corresponding to those we wrote before.
Again, like with our delay() function, we’ll create a promise when our request starts and use a global Map[Int,Promise{Response]] to convert request ids back to promises. Creating the promises is relatively clean:
| def startRequest(method:Int, url:String,headers:Seq[String] = |
| Seq.empty,body:String = ""):Future[ResponseState] = Zone { implicit z => |
| init() |
| val curlHandle = easy_init() |
| serial += 1 |
| val reqId = serial |
| println(s"initializing handle $curlHandle for request $reqId") |
| val req_id_ptr = malloc(sizeof[Long]).asInstanceOf[Ptr[Long]] |
| !req_id_ptr = reqId |
| requests(reqId) = ResponseState() |
| val promise = Promise[ResponseState]() |
| requestPromises(reqId) = promise |
| |
| method match { |
| case GET => |
| check(curl_easy_setopt(curlHandle, URL, toCString(url)), |
| "easy_setopt") |
| check(curl_easy_setopt(curlHandle, WRITECALLBACK, func_to_ptr(dataCB)), |
| "easy_setopt") |
| check(curl_easy_setopt(curlHandle, WRITEDATA, |
| req_id_ptr.asInstanceOf[Ptr[Byte]]), "easy_setopt") |
| check(curl_easy_setopt(curlHandle, HEADERCALLBACK, |
| func_to_ptr(headerCB)), "easy_setopt") |
| check(curl_easy_setopt(curlHandle, HEADERDATA, |
| req_id_ptr.asInstanceOf[Ptr[Byte]]), "easy_setopt") |
| check(curl_easy_setopt(curlHandle, PRIVATEDATA, |
| req_id_ptr.asInstanceOf[Ptr[Byte]]), "easy_setopt") |
| } |
| multi_add_handle(multi, curlHandle) |
| |
| println("request initialized") |
| promise.future |
| } |
Completing them is a little trickier. We don’t get a callback that fires every time a request is done; instead, libcurl puts messages in a queue that we can handle with multi_info_read. We’ll also need to use easy_getinfo with GET_PRIVATEDATA to get a pointer back to the response data we’ve been tracking all this time, which will allow us to construct a Response object and complete the Future successfully:
| def cleanup_requests():Unit = { |
| val messages = stackalloc[Int] |
| val privateDataPtr= stackalloc[Ptr[Long]] |
| var message:Ptr[CurlMessage] = multi_info_read(multi,messages) |
| while (message != null) { |
| println(s"""Got a message ${message._1} from multi_info_read, |
| ${!messages} left in queue""") |
| val handle:Curl = message._2 |
| check(easy_getinfo(handle, GET_PRIVATEDATA, |
| privateDataPtr.asInstanceOf[Ptr[Byte]]),"getinfo") |
| val privateData = !privateDataPtr |
| val reqId = !privateData |
| val reqData = requests.remove(reqId).get |
| val promise = Curl.requestPromises.remove(reqId).get |
| promise.success(reqData) |
| message = multi_info_read(multi,messages) |
| } |
| println("done handling messages") |
| } |
With these methods in place, we can then adapt our command-line utility from before to make asynchronous requests, on a single libuv event loop:
| def main(args:Array[String]):Unit = { |
| if (args.length == 0) { |
| println("usage: ./curl-out https://www.example.com") |
| ??? |
| } |
| |
| println("initializing loop") |
| implicit val loop = EventLoop |
| val resp = Zone { implicit z => |
| for (arg <- args) { |
| val url = arg |
| val resp = Curl.startRequest(GET,url) |
| |
| resp.onComplete { |
| case Success(data) => |
| println(s"got response for ${arg} - length ${data.body.size}") |
| println(s"headers:") |
| for (h <- data.headers) { |
| println(s"request header: $h") |
| } |
| println(s"body: ${data.body}") |
| case Failure(f) => |
| println("request failed",f) |
| } |
| } |
| } |
| |
| loop.run() |
| println("done") |
| } |
When run, it should behave like this:
| $ ./target/scala-2.11/curl_async-out https://www.example.com |
| initializing loop |
| uv_prepare_init returned 0 |
| ... |
| got back response for https://www.example.com - body of length 1270 |
| headers: |
| request header: (X-Cache,HIT) |
| request header: (Vary,Accept-Encoding) |
| ... |
| body: <!doctype html> |
| <html> |
| <head> |
| <title>Example Domain</title> |
| ... |
| stopping dispatcher |
| on_timer fired, 0 sockets running |
| uv_run returned 0 |
| loop done, cleaning up |
| cleaning up internals |
| done handling messages |
| done |
This does, essentially, the same work as the blocking version, except it can perform all the requests at the same time. If we time it, we’ll see that it can perform many requests at once without impacting its performance and vastly improve on the speed of the blocking version.