Streaming File Input in libuv

In the POSIX model, we could treat pipes and files more or less interchangeably and use the same blocking I/O patterns, such as fgets and println, with either one. But in libuv, files are a totally different beast from anything else we’ve encountered. Because ordinary disk files are not pollable for readiness in Linux, there’s no way to read or write from a file without the risk of blocking.

libuv provides an alternative: a background thread pool for running UNIX-style system calls, where blocking calls can run without interfering with our event loop. These system calls are not modeled as streams, though. Each call is one-off, which means we’ll need to do a little extra work to create something that fits our Pipe interface.

The tricky part is that there are quite a few of these calls, but they all take the same callback type, uv_fs_cb, which only returns the uv_fs_t request handle that originated it. However, we’ll only need a few actual syscalls to implement streaming file I/O. All we need to do is open files, read from them, write from them, and close them:

LibUVPipes/file_pipe/loop.scala
 type​ ​FSReq​ = Ptr[​Ptr​[​Byte​]]
 type​ ​FSCB​ = CFuncPtr1[​FSReq​,​Unit​]
 
 def​ uv_fs_open(loop​:​​Loop​, req​:​​FSReq​, path​:​​CString​, flags​:​​Int​, mode​:​​Int​,
  cb​:​​FSCB​)​:​​Int​ = extern
 def​ uv_fs_read(loop​:​​Loop​, req​:​​FSReq​, fd​:​​Int​, bufs​:​​Ptr​[​Buffer​], numBufs​:​​Int​,
  offset​:​​Long​, fsCB​:​​FSCB​)​:​​Int​ = extern
 def​ uv_fs_write(loop​:​​Loop​, req​:​​FSReq​, fd​:​​Int​, bufs​:​​Ptr​[​Buffer​], numBufs​:​​Int​,
  offset​:​​Long​, fsCB​:​​FSCB​)​:​​Int​ = extern
 def​ uv_fs_close(loop​:​​Loop​, req​:​​FSReq​, fd​:​​Int​, fsCB​:​​FSCB​)​:​​Int​ = extern
 def​ uv_req_cleanup(req​:​​FSReq​)​:​​Unit​ = extern
 def​ uv_fs_get_result(req​:​​FSReq​)​:​​Int​ = extern
 def​ uv_fs_get_ptr(req​:​​FSReq​)​:​​Ptr​[​Byte​] ​=​ extern

As before, we’ll need a small case class to instantiate our actual Pipe object:

LibUVPipes/file_pipe_out/main.scala
 case​ ​class​ FilePipe(serial​:​​Long​) ​extends​ Pipe[​String​,​String​] {
 override​ ​def​ feed(input​:​​String​)​:​​Unit​ = {
 for​ (h ​<-​ handlers) {
  h.feed(input)
  }
  }
 }

But most of the implementation will be in the companion object. We’ll keep our global state there as before:

LibUVPipes/file_pipe_out/main.scala
 object​ FilePipe {
 import​ ​LibUV._​, LibUVConstants.​_
 type​ ​FilePipeState​ = CStruct3[​Int​,​Ptr​[​Buffer​],​Long​] ​// fd, buffer, offset
 
 var​ active_streams​:​​mutable.Set​[​Int​] ​=​ mutable.Set()
 var​ handlers ​=​ mutable.HashMap[​Int​,​Pipe​[​String​,​String​]]()
 var​ serial ​=​ 0

And we’ll use the following block of code to open new files for reading, instantiate a FilePipe object, and make an initial call to uv_fs_read:

LibUVPipes/file_pipe_out/main.scala
 def​ apply(path​:​​CString​)​:​​Pipe​[​String​,​String​] ​=​ {
 val​ req ​=​ stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[​FSReq​]
  println(​"opening file"​)
 val​ fd ​=​ util.open(path,0,0)
  stdio.printf(c​"open file at %s returned %d\n"​, path, fd)
 
 val​ state ​=​ stdlib.malloc(sizeof[​FilePipeState​])
  .asInstanceOf[​Ptr​[​FilePipeState​]]
 val​ buf ​=​ stdlib.malloc(sizeof[​Buffer​]).asInstanceOf[​Ptr​[​Buffer​]]
  buf._1 ​=​ stdlib.malloc(4096)
  buf._2 ​=​ 4095
  state._1 ​=​ fd
  state._2 ​=​ buf
  state._3 ​=​ 0L
  !req ​=​ state.asInstanceOf[​Ptr​[​Byte​]]
 
  println(​"about to read"​)
  uv_fs_read(EventLoop.loop,req,fd,buf,1,-1,readCB)
  println(​"read started"​)
 val​ pipe ​=​ Pipe.source[​String​]
  handlers(fd) ​=​ pipe
  println(​"about to return"​)
  active_streams += fd
  pipe
 }

So far, this has been pretty similar to other code we’ve written. The difference, however, is that uv_fs_read will only execute and return one time. As a result, if we want to read a file until we’re done, we need to add logic to the corresponding handler function so that we continue issuing read requests whenever we need more data:

LibUVPipes/file_pipe_out/main.scala
 val​ readCB​:​​FSCB​ = ​new​ FSCB {
 def​ apply(req​:​​FSReq​)​:​​Unit​ = {
  println(​"read callback fired!"​)
 val​ res ​=​ uv_fs_get_result(req)
  println(s​"got result: $res"​)
 val​ state_ptr ​=​ (!req).asInstanceOf[​Ptr​[​FilePipeState​]]
  println(​"inspecting state"​)
 val​ fd ​=​ state_ptr._1
 val​ buf ​=​ state_ptr._2
 val​ offset ​=​ state_ptr._3
  printf(​"state: fd %d, offset %d\n"​, fd, offset.toInt)
 
 if​ (res > 0) {
  println(​"producing string"​)
  (buf._1)(res) ​=​ 0
 val​ output ​=​ fromCString(buf._1)
 val​ pipe ​=​ handlers(fd)
  pipe.feed(output)
  println(​"continuing"​)
  state_ptr._3 ​=​ state_ptr._3 + res
  uv_fs_read(EventLoop.loop,req,fd,state_ptr._2,1,state_ptr._3,readCB)
  } ​else​ ​if​ (res == 0) {
  println(​"done"​)
 val​ pipe ​=​ handlers(fd)
  pipe.done()
  active_streams -= fd
  } ​else​ {
  println(​"error"​)
  active_streams -= fd
  }
  }
 }

And with the Pipe trait fully implemented, we can now swap a file in for standard input easily:

LibUVPipes/examples.scala
 object​ FileInputPipeExample {
 import​ ​LibUV._​, LibUVConstants.​_
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
 val​ p ​=​ FilePipe(c​"./data.txt"​)
  .map { d ​=>
  println(s​"consumed $d"​)
  d
  }.map { d ​=>
 val​ parsed ​=​ Try {
  d.toInt
  }
  println(s​"parsed: $parsed"​)
  parsed
  }
  .addDestination(FileOutputPipe(c​"./output.txt"​, ​false​))
  uv_run(EventLoop.loop,UV_RUN_DEFAULT)
  println(​"done"​)
  }
 }

When you run this, you should see output like this:

 $ ./target/scala-2.11/file_input-out
 uv_prepare_init returned 0
 uv_prepare_start returned 0
 read 4 bytes from pipe 0
 consumed foo
 parsed: Failure(java.lang.NumberFormatException: foo)
 error: java.lang.NumberFormatException: foo
 read 2 bytes from pipe 0
 consumed 1
 parsed: Success(1)
 saw number 1
 read 2 bytes from pipe 0
 consumed 2
 parsed: Success(2)
 saw number 2
 read -4095 bytes from pipe 0
 size < 0, closing
 stopping dispatcher
 done

But what’s next? We could write many interesting transformation stages, but the most pressing need now is a similar way to handle output so that we don’t have to rely on the blocking println any longer.