In the POSIX model, we could treat pipes and files more or less interchangeably and use the same blocking I/O patterns, such as fgets and println, with either one. But in libuv, files are a totally different beast from anything else we’ve encountered. Because ordinary disk files are not pollable for readiness in Linux, there’s no way to read or write from a file without the risk of blocking.
libuv provides an alternative: a background thread pool for running UNIX-style system calls, where blocking calls can run without interfering with our event loop. These system calls are not modeled as streams, though. Each call is one-off, which means we’ll need to do a little extra work to create something that fits our Pipe interface.
The tricky part is that there are quite a few of these calls, but they all take the same callback type, uv_fs_cb, which only returns the uv_fs_t request handle that originated it. However, we’ll only need a few actual syscalls to implement streaming file I/O. All we need to do is open files, read from them, write from them, and close them:
| type FSReq = Ptr[Ptr[Byte]] |
| type FSCB = CFuncPtr1[FSReq,Unit] |
| |
| def uv_fs_open(loop:Loop, req:FSReq, path:CString, flags:Int, mode:Int, |
| cb:FSCB):Int = extern |
| def uv_fs_read(loop:Loop, req:FSReq, fd:Int, bufs:Ptr[Buffer], numBufs:Int, |
| offset:Long, fsCB:FSCB):Int = extern |
| def uv_fs_write(loop:Loop, req:FSReq, fd:Int, bufs:Ptr[Buffer], numBufs:Int, |
| offset:Long, fsCB:FSCB):Int = extern |
| def uv_fs_close(loop:Loop, req:FSReq, fd:Int, fsCB:FSCB):Int = extern |
| def uv_req_cleanup(req:FSReq):Unit = extern |
| def uv_fs_get_result(req:FSReq):Int = extern |
| def uv_fs_get_ptr(req:FSReq):Ptr[Byte] = extern |
As before, we’ll need a small case class to instantiate our actual Pipe object:
| case class FilePipe(serial:Long) extends Pipe[String,String] { |
| override def feed(input:String):Unit = { |
| for (h <- handlers) { |
| h.feed(input) |
| } |
| } |
| } |
But most of the implementation will be in the companion object. We’ll keep our global state there as before:
| object FilePipe { |
| import LibUV._, LibUVConstants._ |
| type FilePipeState = CStruct3[Int,Ptr[Buffer],Long] // fd, buffer, offset |
| |
| var active_streams:mutable.Set[Int] = mutable.Set() |
| var handlers = mutable.HashMap[Int,Pipe[String,String]]() |
| var serial = 0 |
And we’ll use the following block of code to open new files for reading, instantiate a FilePipe object, and make an initial call to uv_fs_read:
| def apply(path:CString):Pipe[String,String] = { |
| val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] |
| println("opening file") |
| val fd = util.open(path,0,0) |
| stdio.printf(c"open file at %s returned %d\n", path, fd) |
| |
| val state = stdlib.malloc(sizeof[FilePipeState]) |
| .asInstanceOf[Ptr[FilePipeState]] |
| val buf = stdlib.malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] |
| buf._1 = stdlib.malloc(4096) |
| buf._2 = 4095 |
| state._1 = fd |
| state._2 = buf |
| state._3 = 0L |
| !req = state.asInstanceOf[Ptr[Byte]] |
| |
| println("about to read") |
| uv_fs_read(EventLoop.loop,req,fd,buf,1,-1,readCB) |
| println("read started") |
| val pipe = Pipe.source[String] |
| handlers(fd) = pipe |
| println("about to return") |
| active_streams += fd |
| pipe |
| } |
So far, this has been pretty similar to other code we’ve written. The difference, however, is that uv_fs_read will only execute and return one time. As a result, if we want to read a file until we’re done, we need to add logic to the corresponding handler function so that we continue issuing read requests whenever we need more data:
| val readCB:FSCB = new FSCB { |
| def apply(req:FSReq):Unit = { |
| println("read callback fired!") |
| val res = uv_fs_get_result(req) |
| println(s"got result: $res") |
| val state_ptr = (!req).asInstanceOf[Ptr[FilePipeState]] |
| println("inspecting state") |
| val fd = state_ptr._1 |
| val buf = state_ptr._2 |
| val offset = state_ptr._3 |
| printf("state: fd %d, offset %d\n", fd, offset.toInt) |
| |
| if (res > 0) { |
| println("producing string") |
| (buf._1)(res) = 0 |
| val output = fromCString(buf._1) |
| val pipe = handlers(fd) |
| pipe.feed(output) |
| println("continuing") |
| state_ptr._3 = state_ptr._3 + res |
| uv_fs_read(EventLoop.loop,req,fd,state_ptr._2,1,state_ptr._3,readCB) |
| } else if (res == 0) { |
| println("done") |
| val pipe = handlers(fd) |
| pipe.done() |
| active_streams -= fd |
| } else { |
| println("error") |
| active_streams -= fd |
| } |
| } |
| } |
And with the Pipe trait fully implemented, we can now swap a file in for standard input easily:
| object FileInputPipeExample { |
| import LibUV._, LibUVConstants._ |
| def main(args:Array[String]):Unit = { |
| val p = FilePipe(c"./data.txt") |
| .map { d => |
| println(s"consumed $d") |
| d |
| }.map { d => |
| val parsed = Try { |
| d.toInt |
| } |
| println(s"parsed: $parsed") |
| parsed |
| } |
| .addDestination(FileOutputPipe(c"./output.txt", false)) |
| uv_run(EventLoop.loop,UV_RUN_DEFAULT) |
| println("done") |
| } |
| } |
When you run this, you should see output like this:
| $ ./target/scala-2.11/file_input-out |
| uv_prepare_init returned 0 |
| uv_prepare_start returned 0 |
| read 4 bytes from pipe 0 |
| consumed foo |
| parsed: Failure(java.lang.NumberFormatException: foo) |
| error: java.lang.NumberFormatException: foo |
| read 2 bytes from pipe 0 |
| consumed 1 |
| parsed: Success(1) |
| saw number 1 |
| read 2 bytes from pipe 0 |
| consumed 2 |
| parsed: Success(2) |
| saw number 2 |
| read -4095 bytes from pipe 0 |
| size < 0, closing |
| stopping dispatcher |
| done |
But what’s next? We could write many interesting transformation stages, but the most pressing need now is a similar way to handle output so that we don’t have to rely on the blocking println any longer.