Now, we’re ready to implement the Server layer and lay the foundation for a service DSL. This will largely follow the pattern of the server we implemented in Chapter 6, Doing I/O Right, with Event Loops, but will be more oriented toward supporting typed messages and responses rather than free-form strings. However, we’ll try to isolate the server from some of the subtleties of the actual DSL to control the complexity and allow for alternative designs.
First, the core data structures we need to define are the Request and Response types—the core accommodation we need to make is to parameterize both over some Scala type T. By design, the actual server will only receive Request[String] and Response[String] (it will fall upon the Service layer to translate other types while still allowing them to set headers freely):
| case class Request[T](method:String, url:String, headers:Map[String,String], |
| body:T) |
| case class Response[T](code:Int, description:String, |
| headers:Map[String,String],body:T) |
The other critical data structure is the Route—a way to map incoming requests to a particular handler function according to some combination of request method and URL. We’ll also need a way to distinguish synchronous from asynchronous routes, so we know whether to send them back immediately or wait for a Future to complete:
| sealed trait Route { |
| val method:String |
| val path:String |
| } |
| case class SyncRoute(method:String, path:String, handler:Request[String] => |
| Response[String]) extends Route |
| case class AsyncRoute(method:String, path:String, handler:Request[String] => |
| Future[Response[String]]) extends Route |
Again, our Server layer doesn’t need to know what to do with the routes, just how to handle their results.
Now, we’re ready to write the server code itself, starting with the mostly routine setup and initialization:
| object Server extends Parsing { |
| import LibUVConstants._, LibUV._,HttpParser._ |
| implicit val ec = EventLoop |
| val loop = EventLoop.loop |
| var serial = 1L |
| override val requests = mutable.Map[Long,RequestState]() |
| var activeRequests = 0 |
| |
| val urlCB:HttpDataCB = new HttpDataCB { |
| def apply(p:Ptr[Parser],data:CString,len:Long):Int = onURL(p,data,len) |
| } |
| val onKeyCB:HttpDataCB = new HttpDataCB { |
| def apply(p:Ptr[Parser],data:CString,len:Long):Int = |
| onHeaderKey(p,data,len) |
| } |
| val onValueCB:HttpDataCB = new HttpDataCB { |
| def apply(p:Ptr[Parser],data:CString,len:Long):Int = |
| onHeaderValue(p,data,len) |
| } |
| val onBodyCB:HttpDataCB = new HttpDataCB { |
| def apply(p:Ptr[Parser],data:CString,len:Long):Int = onBody(p,data,len) |
| } |
| val completeCB:HttpCB = new HttpCB { |
| def apply(p:Ptr[Parser]):Int = onMessageComplete(p) |
| } |
| |
| val parserSettings = malloc(sizeof[ParserSettings]) |
| .asInstanceOf[Ptr[ParserSettings]] |
| http_parser_settings_init(parserSettings) |
| parserSettings._2 = urlCB |
| parserSettings._4 = onKeyCB |
| parserSettings._5 = onValueCB |
| parserSettings._7 = onBodyCB |
| parserSettings._8 = completeCB |
| |
| var router:Function1[Request[String],Route] = null |
Note that we mix in the Parsing trait here, and also declare the CFunctionPtr callbacks, because CFunctionPtrs cannot be declared in traits, only static objects.
We’ll also declare a router, the service-supplied function to resolve an incoming URL to its appropriate handler, which we’ll initialize shortly thereafter:
| def init(port:Int, f:Request[String] => Route):Unit = { |
| router = f |
| val addr = malloc(64) |
| check(uv_ip4_addr(c"0.0.0.0", 9999, addr),"uv_ip4_addr") |
| val server = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] |
| check(uv_tcp_init(loop, server), "uv_tcp_init") |
| check(uv_tcp_bind(server, addr, 0), "uv_tcp_bind") |
| check(uv_listen(server, 4096, connectCB), "uv_listen") |
| this.activeRequests = 1 |
| println("running") |
| } |
Unlike our other asynchronous components, this one basically runs forever, so we don’t need to count individual connections. We can just set activeRequests to 1 for now, although this could be modified to allow for multiple services running on different ports.
The basic libuv callback functions are all similar to what we used before in Chapter 6. Some of them won’t be changed at all, but we’ll need to make a few adjustments to work with the HTTP parser. First, we’ll need to initialize everything when a new connection comes in:
| val connectCB = new ConnectionCB { |
| def apply(server:TCPHandle, status:Int):Unit = { |
| println(s"connection incoming with status $status") |
| val client = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] |
| val id = serial |
| serial += 1 |
| |
| val state = malloc(sizeof[ConnectionState]) |
| .asInstanceOf[Ptr[ConnectionState]] |
| state._1 = serial |
| state._2 = client |
| http_parser_init(state.at3,HTTP_REQUEST) |
| (state.at3)._8 = state.asInstanceOf[Ptr[Byte]] |
| !(client.asInstanceOf[Ptr[Ptr[Byte]]]) = state.asInstanceOf[Ptr[Byte]] |
| |
| stdio.printf(c"initialized handle at %x, parser at %x\n", client, state) |
| |
| check(uv_tcp_init(loop, client), "uv_tcp_init (client)") |
| check(uv_accept(server, client), "uv_accept") |
| check(uv_read_start(client, allocCB, readCB), "uv_read_start") |
| } |
| } |
And then, whenever we receive data, we will pass it on to the parser with http_parser_execute():
| val readCB = new ReadCB { |
| def apply(handle:TCPHandle, size:CSize, buffer:Ptr[Buffer]):Unit = { |
| val state_ptr = handle.asInstanceOf[Ptr[Ptr[ConnectionState]]] |
| val parser = (!state_ptr).at3 |
| val message_id = (!state_ptr)._1 |
| println(s"conn $message_id: read message of size $size") |
| |
| if (size < 0) { |
| uv_close(handle, null) |
| stdlib.free(buffer._1) |
| } else { |
| http_parser_execute(parser,parserSettings,buffer._1,size) |
| stdlib.free(buffer._1) |
| } |
| } |
| } |
Writing data becomes a bit more complex because we have to handle a few new situations: when a request is complete, we invoke the router to determine how to handle the request, then we check whether the matched route is a SyncRoute or AsyncRoute. If it’s a SyncRoute, we can send the response immediately, but if it’s an AsyncRoute, we can get a Future[Response[String]] and send a response when it completes.
We’ll provide a helper method for each of the two basic cases. sendResponse will handle all of the complex string-manipulation logic in synchronous cases:
| def sendResponse(id:Long,client:TCPHandle, resp:Response[String]):Unit = { |
| var respString = s"HTTP/1.1 ${resp.code} ${resp.description}\r\n" |
| val headers = if (!resp.headers.contains("Content-Length")) { |
| resp.headers + ("Content-Length" -> resp.body.size) |
| } else { resp.headers } |
| |
| for ( (k,v) <- headers) { |
| respString += s"${k}: $v\r\n" |
| } |
| respString += s"\r\n${resp.body}" |
| |
| val buffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] |
| Zone { implicit z => |
| val temp_resp = toCString(respString) |
| val resp_len = strlen(temp_resp) + 1 |
| buffer._1 = malloc(resp_len) |
| buffer._2 = resp_len |
| strncpy(buffer._1, temp_resp, resp_len) |
| } |
| stdio.printf(c"response buffer:\n%s\n",buffer._1) |
| |
| val writeReq = malloc(uv_req_size(UV_WRITE_REQ_T)).asInstanceOf[WriteReq] |
| !writeReq = buffer.asInstanceOf[Ptr[Byte]] |
| check(uv_write(writeReq, client,buffer,1,writeCB),"uv_write") |
| } |
And sendResponseAsync simply wraps sendResponse within a Future.map handler:
| def sendResponseAsync(id:Long,client:TCPHandle, |
| resp:Future[Response[String]]):Unit = { |
| resp.map { r => |
| println("async?") |
| sendResponse(id,client,r) |
| } |
| } |
With those helpers in place, we can finally write the handleRequest method, which completes our interface with the parser. All we need to do is convert the parser’s RequestState into a Request[String], pass it to the router, and then decide how to handle the result:
| override def handleRequest(id:Long,client:TCPHandle, r:RequestState):Unit = { |
| println(s"got complete request $id: $r\n") |
| val request = Request(r.method,r.url,r.headerMap.toMap,r.body) |
| val route = router(request) |
| route match { |
| case SyncRoute(_,_,handler) => |
| val resp = handler(request) |
| println("sending sync response") |
| sendResponse(id,client,resp) |
| case AsyncRoute(_,_,handler) => |
| val resp = handler(request) |
| resp.map { r => |
| println("about to send async response") |
| sendResponse(id,client,r) |
| } |
| println("returning immediately, async handler invoked") |
| } |
| } |
The rest is up to the service layer, which we’re now prepared to implement!