Unfortunately, we can’t simply reuse our previous parsing methods. Most important, we can’t do blocking socket I/O as we did in Chapter 3, Writing a Simple HTTP Client, so we’ll need to adapt all the I/O patterns to libuv.
We’ll also want to think hard about memory allocation. With the low-overhead, asynchronous architecture we’ve used in this chapter, our server could potentially be handling thousands of requests per second. At that scale, the overhead of garbage collection, or even manual memory allocation, can have a serious impact on performance.
Before we get into the guts of the implementation, though, let’s take a moment to work through the design of an API that we could present to an end user. Since we’ve already written an HTTP server once before, we can tweak our design based on what we’ve learned so far. With a design in hand, we can then consider several different strategies for implementation and evaluate the trade-offs of each one.
Back in Chapter 5, Writing a Server the Old-Fashioned Way, we defined an HTTP Request and Response with the following case class:
| case class HttpRequest( |
| method:String, |
| uri:String, |
| headers:collection.Map[String, String], |
| body:String) |
| |
| case class HttpResponse( |
| code:Int, |
| headers:collection.Map[String, String], |
| body:String) |
We can build on these by defining the fundamental type signatures that define an HTTP server in terms of these fundamentals. The most important signature is this:
| type RequestHandler = Function1[HttpRequest,HttpResponse] |
This RequestHandler function can actually define lots of different parts of a server:
It naturally defines the handling for any particular kind of request.
It defines the router, which determines which handler to invoke based on the method, uri, headers, and body of the request.
It defines, in some sense, the server itself, which may just be a trivial wrapper around a router method at this point.
Based on these insights, we could define a simple public API that looks like this:
| object HttpServer { |
| def serve_http(port:Short,router:RequestHandler):Unit |
| } |
With an API like this, we could write a clean, simple main function like this:
| def main(args:Array[String]):Unit = { |
| serve_http(8080, request => |
| HttpResponse(200, Map("Content-Length" -> "12"),"hello world\n")) |
| } |
Later on, we’ll work on the design of more sophisticated and performant routing logic, but a great benefit of this design is that we can serve a single-route request handler for now, and plug in other designs as we please. Next, we’ll proceed with implementing this API from the outside in, reusing as much of our previous work as possible.
To implement this API, we’ll need to solve three basic design problems:
All of these will make the code a bit more complicated than the blocking, fork-based HTTP server we wrote before. In particular, since we had a dedicated process for each connection, we could use fgets to read exactly one line at a time from a TCP socket, which made parsing the HTTP headers much easier.
In contrast, libuv will simply give us a chunk of whatever bytes are available, up to the maximum size of the buffer. This opens up a wide range of problematic scenarios, some more likely than others:
A single onRead call is very likely to receive multiple lines of input at once.
Depending on the size of the request and the buffer, onRead may or may not be able to receive the whole request in a single call.
If the request doesn’t fit in a single buffer, it most likely doesn’t break on a line boundary.
If the client is especially slow in sending the request, onRead may receive a partial request that’s smaller than the buffer.
If a request is incomplete, the end of the received data may fall in the headers or the body of the request.
Further, all of these scenarios are complicated by the imperative, stateful nature of the HTTP protocol, which changes its behavior depending on what headers are read. In short, asynchronous parsing is a hard problem, and not one we should underestimate. There are good C libraries we could bring in for this; there’s even node.js’s own HTTP-parser C library, which has a close affinity for libuv, and would be a great option for a server like this.
But we do already have our own parsing code to adapt, and we also have our Gatling stress-testing scenario from before. So, instead of embarking upon a large engineering project and greatly enlarging the scope of this book, I’ve opted to adapt our existing code in a more straightforward way, while making a few optimistic assumptions. In particular, I found that with reasonably sized buffers and large requests, libuv is unlikely to provide a partial request for processing. And once we’ve implemented this, we can run our stress test and see how well our assumptions hold up.
There’s one more catch, though. We also have to figure out a strategy to connect our user-supplied RequestHandler function with libuv. This is tricky, because Scala Native only allows us to generate a CFunctionPtr for static functions that are fully known at compile time. This means we can’t directly convert the router argument into something that we can use as a libuv read handler. Instead, we’ll use mutable object state to hold on to our router function, and then bring it back in for our onRead handler a little later on.
Following this pattern, serve_http will look like so:
| var router:RequestHandler = (_ => ???) |
| |
| def serve_http(port:Int, handler:RequestHandler):Unit = { |
| println(s"about to serve on port ${port}") |
| this.router = handler |
| serve_tcp(c"0.0.0.0",port,0,4096,connectionCB) |
| } |
Here, we’re writing a thin wrapper around our pre-existing serve_tcp and on_connect functions. The next modification we’ll make is in our on_read function. In some ways, it’ll be simpler than before; since we don’t allow client requests to span multiple calls to on_read, we no longer need to maintain a ClientState data structure, for example. If we abstract out the details of parsing requests, the code is straightforward:
| val readCB = new ReadCB { |
| def apply(client:TCPHandle, size:CSSize, buffer:Ptr[Buffer]):Unit = { |
| if (size < 0) { |
| shutdown(client) |
| } else { |
| try { |
| val parsed_request = HTTP.parseRequest(buffer._1, size) |
| val response = router(parsed_request) |
| send_response(client,response) |
| shutdown(client) |
| } catch { |
| case e:Throwable => |
| println(s"error during parsing: ${e}") |
| shutdown(client) |
| } |
| } |
| } |
| } |
With these relatively thin wrappers in place, we’re ready to begin writing our parsing code.
Assuming we’ll receive a full request in a single onRead call, we can express our parsing algorithm, in pseudocode, like this:
Scan the first line of the request as an HTTP request line and extract the method and URI.
If the request line is malformed, raise an exception and kill the connection.
Scan additional lines as headers, validating them but not extracting them.
If a header is malformed, the request is likely incomplete, and we can raise an exception.
When we encounter an empty line, we have reached the end of the headers, and should mark that point.
If there is content after the end of the headers, it constitutes the request body.
The trick with this is to minimize the amount of garbage allocation we do in the process, as well as the copying of intermediate values. If we’re going to populate a Map of our headers, we won’t be able to go for a completely zero-copy implementation style, but even a few considerate gestures will save us a lot of memory churn. In particular, since this is a single-threaded application, we can statically allocate a few buffers for temporary storage of request header lines and their components, and thus cut down on memory allocation during request handling.
For example, the first thing we’ll have to do is parse the HTTP request line. We’ll malloc buffers outside of the function as top-level fields of the enclosing object. We’ll also use a scanf modifier * in our pattern to indicate noncapturing matches—in this case, we don’t need to grab the HTTP protocol version—as well as the pseudo-pattern %n, which lets us capture and return the number of bytes read at the end, like this:
| val method_buffer = malloc(16) |
| val uri_buffer = malloc(4096) |
| |
| def scan_request_line(line:CString):(String,String,Int) = { |
| val line_len = stackalloc[Int] |
| val scan_result = stdio.sscanf(line, c"%s %s %*s\r\n%n", method_buffer, |
| uri_buffer, line_len) |
| if (scan_result == 2) { |
| (fromCString(method_buffer), fromCString(uri_buffer), !line_len) |
| } else { |
| throw new Exception("bad request line") |
| } |
| } |
This method will then return the HTTP method, request URI, as well as the length, in bytes, of the line that it read, so that the enclosing on_read function can know where to start scanning next.
Next, we need to scan headers. These are a little trickier, since there may be whitespace at a few different parts, including within the content of the value portion of the line, as in Accept-Encoding: gzip, deflate, br. To match this, we’ll use more constrained character patterns than before, such as %[^\r\n], to match anything but new lines, or %[^: ] to match anything but a colon or space, rather than %s. And since we don’t want to allocate storage for headers at all, we’ll use the %n pseudo-pattern even more liberally, which will allow us to validate the start and end of the key and value portion of the line, without copying them to temporary storage. The resulting pattern string is a bit of a handful: c"%*[^\r\n:]%n: %n%*[^\r\n]%n%*[\r\n]%n"; we’ll also be passing in pointers from the outer calling function, since it will need to be able to inspect some of the offsets to determine whether we’re at the final header in the request.
All this logic makes things a bit more complicated, but it’s not too bad:
| def scan_header_line(line:CString, |
| out_map:mutable.Map[String,String],key_end:Ptr[Int], |
| value_start:Ptr[Int], value_end:Ptr[Int], |
| line_len:Ptr[Int]):Int = { |
| !line_len = -1 |
| val scan_result = stdio.sscanf(line, |
| c"%*[^\r\n:]%n: %n%*[^\r\n]%n%*[\r\n]%n", |
| key_end, value_start, value_end, line_len) |
| if (!line_len != -1) { |
| val start_of_key = line |
| val end_of_key = line + !key_end |
| !end_of_key = 0 |
| val start_of_value = line + !value_start |
| val end_of_value = line + !value_end |
| !end_of_value = 0 |
| val key = fromCString(start_of_key) |
| val value = fromCString(start_of_value) |
| out_map(key) = value |
| !line_len |
| } else { |
| throw new Exception("bad header line") |
| } |
| } |
Now we can stitch these two functions together into a function that will parse and validate requests for us.
Because both scan functions give us back the number of bytes read, we can use that value to maintain a counter of how many total bytes we’ve read so far. We’ll also inspect the trailing whitespace at the end of each header line. If we have 2 bytes trailing, we should expect another header, whereas if there are 4 bytes left, we know this header is the last one. Finally, we’ll need to check for the request body after the headers and parse it to a CString as well.
The whole function works like this:
| val line_buffer = malloc(1024) |
| |
| def parseRequest(req:CString, size: Long):HttpRequest = { |
| req(size) = 0 // ensure null termination |
| var req_position = req |
| val line_len = stackalloc[Int] |
| val key_end = stackalloc[Int] |
| val value_start = stackalloc[Int] |
| val value_end = stackalloc[Int] |
| val headers = mutable.Map[String,String]() |
| |
| val (method,uri,request_len) = scan_request_line(req) |
| |
| var bytes_read = request_len |
| while (bytes_read < size) { |
| req_position = req + bytes_read |
| val parse_header_result = scan_header_line(req_position, headers, |
| key_end, value_start, value_end, line_len) |
| if (parse_header_result < 0) { |
| throw new Exception("HEADERS INCOMPLETE") |
| } else if (!line_len - !value_end == 2) { |
| bytes_read += parse_header_result |
| } else if (!line_len - !value_end == 4) { |
| val remaining = size - bytes_read |
| val body = fromCString(req + bytes_read) |
| return HttpRequest(method,uri,headers,body) |
| } else { |
| throw new Exception("malformed header!") |
| } |
| } |
| throw new Exception(s"bad scan, exceeded $size bytes") |
| } |
With HTTP parsing taken care of, we’ve implemented all of the components needed, and our server is done! When we compile and run it, we should see output like this:
| $ ./target/scala-2.11/async_http-out |
| about to serve on port 8080 |
If we then navigate to http://localhost:8080/ in a web browser, we should promptly see the Hello, World response.
But we’re just getting started. Now that we’ve done the work to create a nonblocking HTTP server, let’s compare its performance to the blocking server we created in Chapter 5, Writing a Server the Old-Fashioned Way. As a refresher, we used Gatling to measure the response time at the 50th and 99th percentile, as well as the overall throughput and error rate at different levels of concurrency, starting from 10 users and going up to 2000. Our fork-based server performed best with about 250 users, handling about 500 requests per second.
Let’s run the exact same script on our new asynchronous server, like so:
| $ export GATLING_URL=http://localhost:8080 GATLING_USERS=10 |
| $ export GATLING_REQUESTS=50 GATLING_RAMP_TIME=0 |
| $ gatling.sh http://localhost:8080 10 500 |
By running that same command with different user counts, we can tabulate results and compare the performance of the two servers:
| |# of users|request count|50th %ile|99th %ile|req/second|error rate| |
| | 10| 500| 1| 18| 500| 0| |
| | 25| 1250| 1| 44| 625| 0| |
| | 50| 2500| 2| 32| 1250| 0| |
| | 75| 3750| 2| 51| 1875| 0| |
| | 100| 5000| 3| 62| 2500| 0| |
| | 150| 7500| 4| 136| 2500| 0| |
| | 200| 10000| 5| 147| 3333| 0| |
| | 250| 12500| 4| 151| 4166| 0.1%| |
| | 300| 15000| 5| 245| 3000| 3%| |
| | 350| 17500| 8| 300| 2916| 5%| |
| | 400| 20000| 8| 258| 4000| 6%| |
| | 450| 22500| 6| 221| 4500| 4%| |
| | 500| 25000| 11| 326| 3571| 7%| |
| | 750| 37500| 16| 335| 3750| 12%| |
| | 1000| 50000| 12| 326| 2380| 14%| |
| | 1500| 75000| 8| 301| 1744| 15%| |
| | 2000| 100000| 8| 437| 3255| 14%| |
The performance of the asynchronous server is dramatically improved! Median performance is 30 to 50 times faster across the board; 99th percentile response and overall throughput is 10 times better than the blocking implementation; and error rates are much reduced at the highest levels of load, from 52% to 14% at 2000 users. Further, we can also see that although the local Gatling test is excellent at finding the peak throughput of our symptoms, it’s also unrealistically harsh at simulating actual users. We’d expect response times and server load to be at least an order of magnitude less in a real-world cloud deployment, which suggests that our server could handle 10,000 users, or more, in many scenarios.