Just as we did in Chapter 5, Writing a Server the Old-Fashioned Way, we’re going to write a simple TCP echo server that accepts connections, reads strings from the connected socket, and writes back the same message in response. Although it’s about as simple as a network protocol gets, this scenario is enough for us to exercise all the essential API calls we need to make a libuv server work. And as you’ll see, there’s a close correspondence between these calls and those you learned when we created our blocking server.
To receive connections on a server, we have to initialize the socket, bind to a port, listen for incoming connections, and then accept each incoming connection before the connections are usable. We’ll do this in much the same way as we did before, but with a different set of API calls and the data structures that we’ll need to invoke them. Let’s look at all of these definitions at once:
| type Buffer = CStruct2[Ptr[Byte],CSize] |
| type TCPHandle = Ptr[Ptr[Byte]] |
| type ConnectionCB = CFuncPtr2[TCPHandle, Int, Unit] |
| type WriteReq = Ptr[Ptr[Byte]] |
| type ShutdownReq = Ptr[Ptr[Byte]] |
| |
| def uv_tcp_init(loop:Loop, tcp_handle:TCPHandle):Int = extern |
| def uv_tcp_bind(tcp_handle:TCPHandle, address:Ptr[Byte], flags:Int):Int = |
| extern |
| def uv_listen(stream_handle:TCPHandle, backlog:Int, |
| uv_connection_cb:ConnectionCB): Int = extern |
| def uv_accept(server:TCPHandle, client:TCPHandle): Int = extern |
| def uv_read_start(client:TCPHandle, allocCB:AllocCB, readCB:ReadCB): Int = |
| extern |
| def uv_write(writeReq:WriteReq, client:TCPHandle, bufs: Ptr[Buffer], |
| numBufs:Int, writeCB:WriteCB): Int = extern |
| def uv_shutdown(shutdownReq:ShutdownReq, client:TCPHandle, |
| shutdownCB:ShutdownCB): Int = extern |
| def uv_close(handle:TCPHandle, closeCB: CloseCB): Int = extern |
| |
| def uv_ip4_addr(address:CString, port:Int, out_addr:Ptr[Byte]):Int = extern |
| def uv_ip4_name(address:Ptr[Byte], s:CString, size:Int):Int = extern |
| |
| type AllocCB = CFuncPtr3[TCPHandle,CSize,Ptr[Buffer],Unit] |
| type ReadCB = CFuncPtr3[TCPHandle,CSSize,Ptr[Buffer],Unit] |
| type WriteCB = CFuncPtr2[WriteReq,Int,Unit] |
| type ShutdownCB = CFuncPtr2[ShutdownReq,Int,Unit] |
| type CloseCB = CFuncPtr1[TCPHandle,Unit] |
We’ll use the type TCPHandle to model a TCP handle in libuv, and we’ll use the same technique as we did with Timer to represent it in Scala Native as a Ptr[Ptr[Byte]], ignoring the trailing private fields and internal data structures.
Type Puns and Safety | |
---|---|
If we were to inspect the C header, we would see that many of these functions take a stream-type handle, rather than the more specific TCP handle. For example, uv_listen accepts a Stream handle, rather than a TCP instance, because it can handle other socket types (like UDP and named pipes), so that Stream functions like a trait or superclass. However, our bindings don’t have to concern themselves, since all handle types in this book will be modeled as opaque Ptr[Ptr[Byte]] aliases. Note, however, that this means the Scala compiler won’t catch certain errors. For example, if we pass a Loop instance to a function that expects a TcpHandle, it won’t complain, since both are aliases for Ptr[Ptr[Byte]]. |
As for the actual functions, they’re largely straightforward. uv_tcp_init initializes an allocated TCP handle and returns an error code as an int. uv_tcp_bind binds the initialized handle with a TCP address (which is represented with the same Sockaddr structure we used back in Chapter 3, Writing a Simple HTTP Client) and we can use the uv_ip4_addr helper function to populate it. Finally, after we call uv_listen, the event loop will call uv_connection_cb every time there’s an incoming connection. The ConnectionCallback itself just gets a reference to the listening TCP socket and a status flag in an int.
In other words, this is the same sequence of fundamental system calls that our blocking server made, but instead of issuing them ourselves, we’re setting up handler functions to perform these tasks for us. The complexity introduced by this pattern is that without a top-level outer function to handle a full connection lifecycle, initializing and releasing resources becomes much trickier. In general, we’ll want to model all of the state that we need to manage a client connection in a single struct, which we can store via the handle’s inner Ptr[Byte] contents. Later on, we’ll model more complex structures, but for now we can just store a simple buffer structure with counters for allocated and used data:
| type ClientState = CStruct3[Ptr[Byte],CSize,CSize] |
Now we’re ready to write a basic main() routine and top-level server logic. We’ll abstract over the actual ConnectionCallback implementation, although we’ll be getting to that shortly. It’s just a few lines of code so far:
| type ClientState = CStruct3[Ptr[Byte],CSize,CSize] |
| |
| def main(args:Array[String]):Unit = { |
| println("hello!") |
| serve_tcp(c"0.0.0.0", 8080, 0, 100, connectionCB) |
| println("done?") |
| } |
| |
| val loop = uv_default_loop() |
| |
| def serve_tcp(address:CString, port:Int, flags:Int, backlog:Int, |
| callback:ConnectionCB):Unit = { |
| val addr = stackalloc[Byte] |
| val addr_convert = uv_ip4_addr(address, port, addr) |
| println(s"uv_ip4_addr returned $addr_convert") |
| val handle = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] |
| check_error(uv_tcp_init(loop, handle), "uv_tcp_init(server)") |
| check_error(uv_tcp_bind(handle, addr, flags), "uv_tcp_bind") |
| check_error(uv_listen(handle, backlog, callback), "uv_tcp_listen") |
| uv_run(loop, UV_RUN_DEFAULT) |
| () |
| } |
Due to libuv’s highly consistent programming style, I find this approach much more readable and concise than the traditional UNIX equivalent we wrote before. Next, we have to implement our connection callback.
When we receive an incoming connection in libuv, we have a few additional problems to concern ourself with. First, we have to accept the connection, which establishes another TCP handle, separate from the one we’re listening on, but attached to the same event loop. We’ll also need to initialize the ConnectionState struct at this time.
It’s also important that we eventually free the memory we allocate for the ConnectionState and other resources; however, we won’t be doing that until we’re ready to close the connection a little later on. Finally, we’ll need to tell libuv what to do when a request arrives. We’ll do that with two callbacks, on_alloc and on_read, which we’ll be defining in detail in the next section.
As with the previous code sample, libuv’s consistent style means that all of this code is remarkably compact:
| val connectionCB = new ConnectionCB { |
| def apply(handle:TCPHandle, status:Int):Unit = { |
| println("received connection") |
| |
| // initialize the new client tcp handle and its state |
| val client = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] |
| check_error(uv_tcp_init(loop,client), "uv_tcp_init(client)") |
| var client_state_ptr = (!client).asInstanceOf[Ptr[ClientState]] |
| client_state_ptr = initialize_client_state(client) |
| |
| // accept the incoming connection into the new handle |
| check_error(uv_accept(handle,client), "uv_accept") |
| // set up callbacks for incoming data |
| check_error(uv_read_start(client,allocCB,readCB), "uv_read_start") |
| } |
| } |
| |
| def initialize_client_state(client:TCPHandle):Ptr[ClientState] = { |
| val client_state_ptr = stdlib.malloc(sizeof[ClientState]).asInstanceOf |
| [Ptr[ClientState]] |
| stdio.printf(c"""allocated data at %x; assigning into handle storage at |
| %x\n""", client_state_ptr, client) |
| val client_state_data = stdlib.malloc(4096) |
| client_state_ptr._1 = client_state_data |
| client_state_ptr._2 = 4096 // total |
| client_state_ptr._3 = 0 // used |
| !client = client_state_ptr.asInstanceOf[Ptr[Byte]] |
| client_state_ptr |
| } |
uv_accept is mostly equivalent to the standard accept function we’ve already used; however, in this case we need to pass not one, but two TCP socket handles into it. And although the type signature indicates any Stream socket will do, we’ll only be using TCP. The first handle, server, is the one we created, but we’re also guaranteed that it will be passed in to our ConnectionCallback as an argument (and we should always use the handle passed in to the callback, for safety). It’s our responsibility to allocate and initialize the handle for the new TCP connection in this ConnectionCallback before calling uv_accept. Once we do so, the handle is associated with the event loop; but, it won’t receive any data unless we call uv_read_start, which we must also do before we complete our ConnectionCallback.
Now that our connection is fully established, we can read and write data. When the TCP socket that underlies the TCP handle is ready for reading, the event loop will first invoke the AllocationCallback. In that callback, we’re responsible for giving the event loop a Buffer of data that it can safely read into. We haven’t seen Buffer before, but libuv uses it ubiquitously for passing binary data around—it’s just a simple construct containing two fields: a Ptr[Byte] and a length. In the second Int argument to the callback, libuv passes in a number of bytes to request, but we can basically disregard it. libuv has no idea how much data is going to come in at this point, so it always asks for 65536, which isn’t always appropriate—especially for a line-by-line echo server.
To be more conservative, we can implement an AllocationCallback that gives back 4096 bytes instead. Since we indicate the size of the Buffer explicitly in the data structure, the event loop knows not to overflow, and it can always call the AllocationCallback again if it has more data to read. Let’s write it quickly:
| val allocCB = new AllocCB { |
| def apply(client:TCPHandle, size:CSize, buffer:Ptr[Buffer]):Unit = { |
| println("allocating 4096 bytes") |
| val buf = stdlib.malloc(4096) |
| buffer._1 = buf |
| buffer._2 = 4096 |
| } |
| } |
Once memory has been allocated, the event loop will immediately read data into the buffer and then call the ReadCallback.
In this, and in most libuv programs we’ll write, the ReadCallback is the single most complex function we’ll write. It has numerous responsibilities in a typical server program:
It determines whether the client has closed the connection or not.
It receives data in the form of Buffer and parses or copies it into connection state as needed.
It determines when and how to send response back to the client.
Before returning, it frees the memory in the Buffer that we allocated in the AllocationCallback.
Each one of these is complex enough that we’ll want to break up our read callback into a top-level function with several helpers.
The most important detail to get right is handling a closed connection. When a client has terminated a connection unprompted, which is normal in a client-server architecture, the read callback will be invoked with a size argument that is less than 0. However, because TCP connections are bidirectional, we can still send a “response” back; many protocols will even use this as a normal indicator of “the request is complete, please send your response.” We can take that approach for our echo server. Instead of echoing data back line-by-line, it will read until completion, then echo back everything it has read.
This is enough of a plan for us to fill in the top level of our read callback:
| val readCB = new ReadCB { |
| def apply(client:TCPHandle, size:CSSize, buffer:Ptr[Buffer]):Unit = { |
| println(s"read $size bytes") |
| var client_state_ptr = (!client).asInstanceOf[Ptr[ClientState]] |
| if (size < 0) { |
| send_response(client, client_state_ptr) |
| println("connection is closed, shutting down") |
| shutdown(client) |
| } else { |
| append_data(client_state_ptr, size, buffer) |
| stdlib.free(buffer._1) |
| } |
| } |
| } |
Now we just need to fill in the details. The first thing to get right is storing the data we receive, but that can be tricky. We have no guarantee that on_read will be called with a complete request, or even a complete line, nor do we know that it will be called exactly once. This can create a lot of complications for more complex parsers, but fortunately an echo server is simpler than that. We do, however, need to keep track of how many bytes we’ve already copied into the ConnectionState, and be sure to always append to the end, like so:
| def append_data(state:Ptr[ClientState], size:CSSize, |
| buffer:Ptr[Buffer]):Unit = { |
| val copy_position = state._1 + state._3 |
| string.strncpy(copy_position, buffer._1, size) |
| // be sure to update the length of the data since we have copied into it |
| state._3 = state._3 + size |
| stdio.printf(c"client %x: %d/%d bytes used\n", state, state._3, state._2) |
| } |
Although this pattern is more or less straightforward, the complexity can grow rapidly for more full-featured applications. For example, imagine implementing the pattern of our blocking, fgets-based HTTP parser in this idiom. We would have to handle partial lines, receive multiple lines at once, and maintain huge amounts of dynamic state, all while working with a single data structure. Although it’s certainly doable, it can require discipline and planning.
Once we’ve read some data, and have decided, one way or another, that we have a complete request, it’s time for us to send the client a response. libuv provides us a few functions to help with this, but the pattern is a bit different from reads. Since incoming data is initiated by the client, our program doesn’t control when it happens—we just define a handler to invoke when it shows up. Outgoing writes are different. We can decide exactly what gets sent and choose when to initiate the write, but we cannot decide exactly when the client will choose to consume it. As a result, writes are tracked individually, rather than per-connection, which makes things a little more complex to implement, but also simplifies a lot of the harder resource-management problems.
The most important new data structure is the WriteReq, or write request, which looks like this:
| type WriteReq = Ptr[Ptr[Byte]] |
Again, it’s just another opaque pointer, and we’ll initialize it with the uv_req_size helper function. However, the WriteReq doesn’t actually contain the response data—that’s done with an array of the Buffer objects we’ve already used for receiving input. (We’ll only ever use a single buffer in this book, but it’s possible to have larger arrays, which is why uv_write takes a size argument.) I also generally find it convenient to store a pointer to the buffer in the contents of the WriteReq data pointer itself, which will make cleanup easier when we’re done, but it’s not required, and you could implement much more complex state for write requests if you wished.
Once we have initialized our request and buffer, we can generate our response—I’ve modeled this as a simple function that takes the ConnectionState and Buffer, but this can, again, be as simple or complex as you wish—and then pass the populated data on to uv_write. uv_write also takes a callback, which will be invoked once the write is completed (has been fully received by the client). That’s when we’ll be ready to free the write request, buffer, and any other data we’ve allocated:
| def send_response(client:TCPHandle,state:Ptr[ClientState]):Unit = { |
| val resp = malloc(uv_req_size(UV_WRITE_REQ_T)).asInstanceOf[WriteReq] |
| val resp_buffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] |
| resp_buffer._1 = make_response(state) |
| resp_buffer._2 = string.strlen(resp_buffer._1) |
| !resp = resp_buffer.asInstanceOf[Ptr[Byte]] |
| check_error(uv_write(resp,client,resp_buffer,1,writeCB), "uv_write") |
| } |
| |
| def make_response(state:Ptr[ClientState]):CString = { |
| val response_format = c"received response:\n%s\n" |
| val response_data = malloc(string.strlen(response_format) + state._3) |
| stdio.sprintf(response_data, response_format, state._1) |
| response_data |
| } |
| |
| val writeCB = new WriteCB { |
| def apply(writeReq:WriteReq, status:Int):Unit = { |
| println("write completed") |
| val resp_buffer = (!writeReq).asInstanceOf[Ptr[Buffer]] |
| stdlib.free(resp_buffer._1) |
| stdlib.free(resp_buffer.asInstanceOf[Ptr[Byte]]) |
| stdlib.free(writeReq.asInstanceOf[Ptr[Byte]]) |
| } |
| } |
Here, our make_response function is about as simple as we can be: we’re using snprintf to create a string with a brief message, followed by the data we’ve consumed so far.
We’re almost ready to put all the pieces together! We’re just missing one last piece: closing a TCP connection, whether initiated by a client, or by our server. TCP sockets make this extra tricky because they are bidirectional, which allows a client to shut down its outgoing writes while still being able to receive a response. We didn’t have to deal with this nuance in our blocking HTTP server, since it used sscanf to find request boundaries within a stream of lines. But because our echo server will be reading until the client closes, we have to deal with it.
The good news is that libuv makes this pretty easy for us. It provides two different functions—uv_shutdown and uv_close—with slightly different use cases. uv_close immediately terminates a connection, without waiting for other operations to complete, and shuts down both the read and write side of a socket. uv_shutdown, in contrast, only closes the outgoing (write) side of our connection, and most important, it waits for all pending writes on the connection to complete before doing so.
This means we can invoke uv_shutdown from the on_read callback, right after we invoke uv_write, and be assured that the connection will stay open until the right time. uv_shutdown takes an opaque ShutdownReq, which works exactly like WriteReq, and it also lets us provide another callback, on_shutdown, which is a great place to actually call uv_close. Finally, the on_close callback we provide to uv_close is where we’ll free the ConnectionState and related resources. Altogether, these functions look like so:
| def shutdown(client:TCPHandle):Unit = { |
| val shutdown_req = malloc(uv_req_size(UV_SHUTDOWN_REQ_T)) |
| .asInstanceOf[ShutdownReq] |
| !shutdown_req = client.asInstanceOf[Ptr[Byte]] |
| check_error(uv_shutdown(shutdown_req,client,shutdownCB), "uv_shutdown") |
| } |
| |
| val shutdownCB = new ShutdownCB { |
| def apply(shutdownReq:ShutdownReq, status:Int):Unit = { |
| println("all pending writes complete, closing TCP connection") |
| val client = (!shutdownReq).asInstanceOf[TCPHandle] |
| check_error(uv_close(client,closeCB),"uv_close") |
| stdlib.free(shutdownReq.asInstanceOf[Ptr[Byte]]) |
| } |
| } |
| |
| val closeCB = new CloseCB { |
| def apply(client:TCPHandle):Unit = { |
| println("closed client connection") |
| val client_state_ptr = (!client).asInstanceOf[Ptr[ClientState]] |
| stdlib.free(client_state_ptr._1) |
| stdlib.free(client_state_ptr.asInstanceOf[Ptr[Byte]]) |
| stdlib.free(client.asInstanceOf[Ptr[Byte]]) |
| } |
| } |
This nicely mirrors the way we allocated resources in our on_connect handler. A general rule of thumb for memory management is to pair every malloc call with a free call, and although it can be a bit tricky to figure out exactly where to do it, it doesn’t have to complicate an asynchronous program greatly.
With this last bit of code in place, we’re ready to run our server! We’ll test it out much like we did with our previous TCP echo server—using netcat. First, we’ll boot up the server, and we should see a prompt like this:
| $ ./target/scala-2.11/async_tcp-out |
| hello! |
| uv_ip4_addr returned 0 |
| uv_tcp_init(server) returned 0 |
| uv_tcp_bind returned 0 |
| uv_tcp_listen returned 0 |
Now, we can connect to it with netcat, and type in some text:
| $ nc localhost 8080 |
| foo |
| bar |
When we do this, we won’t see an immediate response because we still have our connection open. This means the server is still waiting for us to send more data. However, we can see in the server logs that it has consumed the data we sent so far:
| ... |
| received connection |
| uv_tcp_init(client) returned 0 |
| allocated data at 6ac02ce0; assigning into handle storage at 6ac02bd0 |
| uv_accept returned 0 |
| uv_read_start returned 0 |
| allocating 4096 bytes |
| read 4 bytes |
| client 6ac02ce0: 4/4096 bytes used |
| allocating 4096 bytes |
| read 4 bytes |
| client 6ac02ce0: 8/4096 bytes used |
To terminate the connection on the client side, type Ctrl + D. We’ll then see the client receive a response:
| ... |
| received response: |
| foo |
| bar |
| |
| $ |
And we can also see the server responding to the connection that just closed:
| allocating 4096 bytes |
| read -4095 bytes |
| uv_write returned 0 |
| connection is closed, shutting down |
| uv_shutdown returned 0 |
| write completed |
| all pending writes complete, closing TCP connection |
| uv_close returned 169822304: Unknown system error 169822304... |
| closed client connection |
If you see an error like the uv_close error above, don’t panic! It’s routine on many operating systems—Mac OS included—when we try to close a socket that has already been shut down on both sides, but otherwise harmless.
After all this, our server is still running and ready to accept more connections. I recommend testing it with multiple simultaneous or sequential connections and seeing how it behaves.
Now that we have a working echo server on a TCP socket, there’s one more step left: we still need to adapt our HTTP parsing code from Chapter 3, Writing a Simple HTTP Client, to work with our asynchronous server.