Moving from Server to Service

Now, we’re ready to implement the Server layer and lay the foundation for a service DSL. This will largely follow the pattern of the server we implemented in Chapter 6, Doing I/O Right, with Event Loops, but will be more oriented toward supporting typed messages and responses rather than free-form strings. However, we’ll try to isolate the server from some of the subtleties of the actual DSL to control the complexity and allow for alternative designs.

Basic Definitions

First, the core data structures we need to define are the Request and Response types—the core accommodation we need to make is to parameterize both over some Scala type T. By design, the actual server will only receive Request[String] and Response[String] (it will fall upon the Service layer to translate other types while still allowing them to set headers freely):

LibUVService/server.scala
 case​ ​class​ Request[​T​](method​:​​String​, url​:​​String​, headers​:​​Map​[​String​,​String​],
  body​:​​T​)
 case​ ​class​ Response[​T​](code​:​​Int​, description​:​​String​,
  headers​:​​Map​[​String​,​String​],body​:​​T​)

The other critical data structure is the Route—a way to map incoming requests to a particular handler function according to some combination of request method and URL. We’ll also need a way to distinguish synchronous from asynchronous routes, so we know whether to send them back immediately or wait for a Future to complete:

LibUVService/server.scala
 sealed​ ​trait​ Route {
 val​ method​:​​String
 val​ path​:​​String
 }
 case​ ​class​ SyncRoute(method​:​​String​, path​:​​String​, handler​:​​Request​[​String​] ​=>
  Response[​String​]) ​extends​ Route
 case​ ​class​ AsyncRoute(method​:​​String​, path​:​​String​, handler​:​​Request​[​String​] ​=>
  Future[​Response​[​String​]]) ​extends​ Route

Again, our Server layer doesn’t need to know what to do with the routes, just how to handle their results.

Now, we’re ready to write the server code itself, starting with the mostly routine setup and initialization:

LibUVService/server.scala
 object​ Server ​extends​ Parsing {
 import​ ​LibUVConstants._​, LibUV.​_​,HttpParser.​_
 implicit​ ​val​ ec ​=​ EventLoop
 val​ loop ​=​ EventLoop.loop
 var​ serial ​=​ 1L
 override​ ​val​ requests ​=​ mutable.Map[​Long​,​RequestState​]()
 var​ activeRequests ​=​ 0
 
 val​ urlCB​:​​HttpDataCB​ = ​new​ HttpDataCB {
 def​ apply(p​:​​Ptr​[​Parser​],data​:​​CString​,len​:​​Long​)​:​​Int​ = onURL(p,data,len)
  }
 val​ onKeyCB​:​​HttpDataCB​ = ​new​ HttpDataCB {
 def​ apply(p​:​​Ptr​[​Parser​],data​:​​CString​,len​:​​Long​)​:​​Int​ =
  onHeaderKey(p,data,len)
  }
 val​ onValueCB​:​​HttpDataCB​ = ​new​ HttpDataCB {
 def​ apply(p​:​​Ptr​[​Parser​],data​:​​CString​,len​:​​Long​)​:​​Int​ =
  onHeaderValue(p,data,len)
  }
 val​ onBodyCB​:​​HttpDataCB​ = ​new​ HttpDataCB {
 def​ apply(p​:​​Ptr​[​Parser​],data​:​​CString​,len​:​​Long​)​:​​Int​ = onBody(p,data,len)
  }
 val​ completeCB​:​​HttpCB​ = ​new​ HttpCB {
 def​ apply(p​:​​Ptr​[​Parser​])​:​​Int​ = onMessageComplete(p)
  }
 
 val​ parserSettings ​=​ malloc(sizeof[​ParserSettings​])
  .asInstanceOf[​Ptr​[​ParserSettings​]]
  http_parser_settings_init(parserSettings)
  parserSettings._2 ​=​ urlCB
  parserSettings._4 ​=​ onKeyCB
  parserSettings._5 ​=​ onValueCB
  parserSettings._7 ​=​ onBodyCB
  parserSettings._8 ​=​ completeCB
 
 var​ router​:​​Function1​[​Request​[​String​],​Route​] ​=​ ​null

Note that we mix in the Parsing trait here, and also declare the CFunctionPtr callbacks, because CFunctionPtrs cannot be declared in traits, only static objects.

We’ll also declare a router, the service-supplied function to resolve an incoming URL to its appropriate handler, which we’ll initialize shortly thereafter:

LibUVService/server.scala
 def​ init(port​:​​Int​, f​:​​Request​[​String​] ​=>​ Route)​:​​Unit​ = {
  router ​=​ f
 val​ addr ​=​ malloc(64)
  check(uv_ip4_addr(c​"0.0.0.0"​, 9999, addr),​"uv_ip4_addr"​)
 val​ server ​=​ malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[​TCPHandle​]
  check(uv_tcp_init(loop, server), ​"uv_tcp_init"​)
  check(uv_tcp_bind(server, addr, 0), ​"uv_tcp_bind"​)
  check(uv_listen(server, 4096, connectCB), ​"uv_listen"​)
 this​.activeRequests ​=​ 1
  println(​"running"​)
 }

Unlike our other asynchronous components, this one basically runs forever, so we don’t need to count individual connections. We can just set activeRequests to 1 for now, although this could be modified to allow for multiple services running on different ports.

Handling I/O

The basic libuv callback functions are all similar to what we used before in Chapter 6. Some of them won’t be changed at all, but we’ll need to make a few adjustments to work with the HTTP parser. First, we’ll need to initialize everything when a new connection comes in:

LibUVService/server.scala
 val​ connectCB ​=​ ​new​ ConnectionCB {
 def​ apply(server​:​​TCPHandle​, status​:​​Int​)​:​​Unit​ = {
  println(s​"connection incoming with status $status"​)
 val​ client ​=​ malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[​TCPHandle​]
 val​ id ​=​ serial
  serial += 1
 
 val​ state ​=​ malloc(sizeof[​ConnectionState​])
  .asInstanceOf[​Ptr​[​ConnectionState​]]
  state._1 ​=​ serial
  state._2 ​=​ client
  http_parser_init(state.at3,HTTP_REQUEST)
  (state.at3)._8 ​=​ state.asInstanceOf[​Ptr​[​Byte​]]
  !(client.asInstanceOf[​Ptr​[​Ptr​[​Byte​]]]) ​=​ state.asInstanceOf[​Ptr​[​Byte​]]
 
  stdio.printf(c​"initialized handle at %x, parser at %x\n"​, client, state)
 
  check(uv_tcp_init(loop, client), ​"uv_tcp_init (client)"​)
  check(uv_accept(server, client), ​"uv_accept"​)
  check(uv_read_start(client, allocCB, readCB), ​"uv_read_start"​)
  }
 }

And then, whenever we receive data, we will pass it on to the parser with http_parser_execute():

LibUVService/server.scala
 val​ readCB ​=​ ​new​ ReadCB {
 def​ apply(handle​:​​TCPHandle​, size​:​​CSize​, buffer​:​​Ptr​[​Buffer​])​:​​Unit​ = {
 val​ state_ptr ​=​ handle.asInstanceOf[​Ptr​[​Ptr​[​ConnectionState​]]]
 val​ parser ​=​ (!state_ptr).at3
 val​ message_id ​=​ (!state_ptr)._1
  println(s​"conn $message_id: read message of size $size"​)
 
 if​ (size < 0) {
  uv_close(handle, ​null​)
  stdlib.free(buffer._1)
  } ​else​ {
  http_parser_execute(parser,parserSettings,buffer._1,size)
  stdlib.free(buffer._1)
  }
  }
 }

Writing data becomes a bit more complex because we have to handle a few new situations: when a request is complete, we invoke the router to determine how to handle the request, then we check whether the matched route is a SyncRoute or AsyncRoute. If it’s a SyncRoute, we can send the response immediately, but if it’s an AsyncRoute, we can get a Future[Response[String]] and send a response when it completes.

We’ll provide a helper method for each of the two basic cases. sendResponse will handle all of the complex string-manipulation logic in synchronous cases:

LibUVService/server.scala
 def​ sendResponse(id​:​​Long​,client​:​​TCPHandle​, resp​:​​Response​[​String​])​:​​Unit​ = {
 var​ respString ​=​ s​"HTTP/1.1 ${resp.code} ${resp.description}\r\n"
 val​ headers ​=​ ​if​ (!resp.headers.contains(​"Content-Length"​)) {
  resp.headers + (​"Content-Length"​ -> resp.body.size)
  } ​else​ { resp.headers }
 
 for​ ( (k,v) ​<-​ headers) {
  respString += s​"${k}: $v\r\n"
  }
  respString += s​"\r\n${resp.body}"
 
 val​ buffer ​=​ malloc(sizeof[​Buffer​]).asInstanceOf[​Ptr​[​Buffer​]]
  Zone { ​implicit​ z ​=>
 val​ temp_resp ​=​ toCString(respString)
 val​ resp_len ​=​ strlen(temp_resp) + 1
  buffer._1 ​=​ malloc(resp_len)
  buffer._2 ​=​ resp_len
  strncpy(buffer._1, temp_resp, resp_len)
  }
  stdio.printf(c​"response buffer:\n%s\n"​,buffer._1)
 
 val​ writeReq ​=​ malloc(uv_req_size(UV_WRITE_REQ_T)).asInstanceOf[​WriteReq​]
  !writeReq ​=​ buffer.asInstanceOf[​Ptr​[​Byte​]]
  check(uv_write(writeReq, client,buffer,1,writeCB),​"uv_write"​)
 }

And sendResponseAsync simply wraps sendResponse within a Future.map handler:

LibUVService/server.scala
 def​ sendResponseAsync(id​:​​Long​,client​:​​TCPHandle​,
  resp​:​​Future​[​Response​[​String​]])​:​​Unit​ = {
  resp.map { r ​=>
  println(​"async?"​)
  sendResponse(id,client,r)
  }
 }

With those helpers in place, we can finally write the handleRequest method, which completes our interface with the parser. All we need to do is convert the parser’s RequestState into a Request[String], pass it to the router, and then decide how to handle the result:

LibUVService/server.scala
 override​ ​def​ handleRequest(id​:​​Long​,client​:​​TCPHandle​, r​:​​RequestState​)​:​​Unit​ = {
  println(s​"got complete request $id: $r\n"​)
 val​ request ​=​ Request(r.method,r.url,r.headerMap.toMap,r.body)
 val​ route ​=​ router(request)
  route ​match​ {
 case​ SyncRoute(​_​,​_​,handler) ​=>
 val​ resp ​=​ handler(request)
  println(​"sending sync response"​)
  sendResponse(id,client,resp)
 case​ AsyncRoute(​_​,​_​,handler) ​=>
 val​ resp ​=​ handler(request)
  resp.map { r ​=>
  println(​"about to send async response"​)
  sendResponse(id,client,r)
  }
  println(​"returning immediately, async handler invoked"​)
  }
 }

The rest is up to the service layer, which we’re now prepared to implement!