Streaming Pipe Input in libuv

We’ll start with our reliable favorite, the standard input pipe. libuv makes it much simpler to work with pipes than to work with files. True disk files don’t natively support the same poll-based I/O techniques that pipes and sockets do, but we’ll work around that a little later.

The PipeHandle provided by libuv, and its associated methods, aren’t especially novel at this point:

LibUVPipes/simple_pipe/loop.scala
 type​ ​PipeHandle​ = Ptr[​Byte​]

Despite adding a few new methods, we’ll largely interact with pipe input like we did with sockets, via the read_start() function and its callbacks.

Before we start writing code, however, we should work out a clean, adaptable design for our data structure. Since Scala already has a standard library class called Stream, we’ll just call ours Pipe. If we were only concerned with standard input, we could use a very simple design indeed:

 trait StandardInput[T] {
  def onInput[T](f:CString => T):Unit
 }

But this wouldn’t easily generalize to chains of processors, where values are transformed to different types. Further, in many cases it’s useful for a pipe to “fan out” to multiple destinations, as well. A more general representation, then, would be a Pipe[I,O], which consumes values of type I and produces values of type O. Some more advanced streaming libraries, like Akka Streams, further distinguish Sources, which only produce values, and Sinks, which can only consume data.

This design has many benefits, but the complex trait mixtures and type parameters that result can be unwieldy to work with, and their implementation is outside the scope of this book. Instead, we can model a source like standard input as a Pipe[String]—this doesn’t prevent all possible misuse, but should be sufficient for our purposes.

Our Pipe trait has three responsibilities:

  1. It receives values of type I from somewhere, and transforms them to type O, by a user-supplied function.

  2. It sends values of type O onward to any number of destinations or subscribers.

  3. It correctly cleans up streams when done, including closing open files.

We can model these responsibilities in the following trait signature:

LibUVPipes/simple_pipe/main.scala
 trait​ Pipe[​T​,​U​] {
 val​ handlers ​=​ mutable.Set[​Pipe​[​U​,​_​]]()
 
 def​ feed(input​:​​T​)​:​​Unit
 def​ done()​:​​Unit​ = {
 for​ (h ​<-​ handlers) {
  h.done()
  }
  }
 
 def​ addDestination[​V​](dest​:​​Pipe​[​U​,​V​])​:​​Pipe​[​U​,​V​] ​=​ {
  handlers += dest
  dest
  }
 // ...

With the overall design in mind, this won’t be too hard to implement on top of libuv. In many ways, it’s closer to the underlying patterns of libuv’s API than our Future-based client in Chapter 7, Functions and Futures: Patterns for Distributed Services.

LibUVPipes/simple_pipe/main.scala
 case​ ​class​ SyncPipe[​T​,​U​](f​:​​T​ => U) ​extends​ Pipe[​T​,​U​] {
 def​ feed(input​:​​T​)​:​​Unit​ = {
 val​ output ​=​ f(input)
 for​ (h ​<-​ handlers) {
  h.feed(output)
  }
  }
 }

As before, we’ll have a global object that holds a bit of state and allows our callbacks to distinguish a few different pipes. We’ll set that state up when we initialize a new pipe, like so:

LibUVPipes/simple_pipe/main.scala
 object​ SyncPipe {
 import​ ​LibUV._​, LibUVConstants.​_
 
 var​ active_streams​:​​mutable.Set​[​Int​] ​=​ mutable.Set()
 var​ handlers ​=​ mutable.HashMap[​Int​,​SyncPipe​[​String​,​String​]]()
 var​ serial ​=​ 0
 
 def​ apply(fd​:​​Int​)​:​​SyncPipe​[​String​,​String​] ​=​ {
 val​ handle ​=​ stdlib.malloc(uv_handle_size(UV_PIPE_T))
  uv_pipe_init(EventLoop.loop,handle,0)
 val​ pipe_data ​=​ handle.asInstanceOf[​Ptr​[​Int​]]
  !pipe_data ​=​ serial
  active_streams += serial
 val​ pipe ​=​ SyncPipe[​String​,​String​]{ s ​=>​ s }
  handlers(serial) ​=​ pipe
 
  serial += 1
  uv_pipe_open(handle,fd)
  uv_read_start(handle,SyncPipe.allocCB,SyncPipe.readCB)
  pipe
  }

We simply take the fd integer file descriptor here to work easily with standard input (which always has the fd 0), but we could extend this to handle named pipes as well. Likewise, we will then need a handler for on_read:

LibUVPipes/simple_pipe/main.scala
 val​ allocCB ​=​ ​new​ AllocCB {
 def​ apply(client​:​​PipeHandle​, size​:​​CSize​, buffer​:​​Ptr​[​Buffer​])​:​​Unit​ = {
 val​ buf ​=​ stdlib.malloc(4096)
  buffer._1 ​=​ buf
  buffer._2 ​=​ 4096
  }
 }
 
 val​ readCB ​=​ ​new​ ReadCB {
 def​ apply(handle​:​​PipeHandle​,size​:​​CSize​,buffer​:​​Ptr​[​Buffer​])​:​​Unit​ = {
 val​ pipe_data ​=​ handle.asInstanceOf[​Ptr​[​Int​]]
 val​ pipe_id ​=​ !pipe_data
  println(s​"read $size bytes from pipe $pipe_id"​)
 if​ (size < 0) {
  println(​"size < 0, closing"​)
  active_streams -= pipe_id
  handlers.remove(pipe_id)
  } ​else​ {
 val​ data_buffer ​=​ stdlib.malloc(size + 1)
  string.strncpy(data_buffer, buffer._1, size + 1)
 val​ data_string ​=​ fromCString(data_buffer)
  stdlib.free(data_buffer)
 val​ pipe_destination ​=​ handlers(pipe_id)
  pipe_destination.feed(data_string.trim())
  }
  }
 }

Since this code already detects when a file has finished, we don’t need to manually close the file; however, we do need to signal to all downstream consumers that it’s time for them to complete.

And just those few lines of code already make up a complete implementation of the Pipe trait! To write a meaningful program, though, we’ll need to be able to transform these values. The most straightforward way to do this is with a map function that applies a function to each item passing through it. We can implement that as a full implementation of the Pipe trait, like so:

LibUVPipes/simple_pipe/main.scala
 def​ map[​V​](g​:​​U​ => V)​:​​Pipe​[​U​,​V​] ​=​ {
 val​ destination ​=​ SyncPipe(g)
  handlers += destination
  destination
 }

And with two pipe implementations in hand, we can compose them into a useful program like this:

LibUVPipes/simple_pipe/main.scala
 object​ Main {
 import​ ​LibUV._​, LibUVConstants.​_
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
  println(​"hello!"​)
 val​ p ​=​ SyncPipe(0)
 val​ q ​=​ p.map { d ​=>
  println(s​"consumed $d"​)
  d
  }.map { d ​=>
 val​ parsed ​=​ Try {
  d.toInt
  }
  println(s​"parsed: $parsed"​)
  parsed
  }.map {
 case​ Success(i) ​=>​ println(s​"saw number $i"​)
 case​ Failure(f) ​=>​ println(s​"error: $f"​)
  }
  uv_run(EventLoop.loop,UV_RUN_DEFAULT)
  println(​"done"​)
  }
 }

And we can test it, too:

 $ ./target/scala-2.11/simple_pipe-out
 hello!
 uv_prepare_init returned 0
 uv_prepare_start returned 0
 foo
 read 4 bytes from pipe 0
 consumed foo
 parsed: Failure(java.lang.NumberFormatException: foo)
 error: java.lang.NumberFormatException: foo
 1
 read 2 bytes from pipe 0
 consumed 1
 parsed: Success(1)
 saw number 1
 2
 read 2 bytes from pipe 0
 consumed 2
 parsed: Success(2)
 saw number 2
 read -4095 bytes from pipe 0
 size < 0, closing
 stopping dispatcher
 done

Looks good! Everything we do in the rest of this chapter will elaborate upon this single, elegant Pipe trait.