As you saw in the last two chapters, output in libuv works differently than input. Where input events originate from outside our program, and can happen at any time and in any number, output always initiates from our own code. Although this slightly complicates our implementation, the good news is that we won’t need to introduce any new library or system calls. But we will want to think hard about the consequences of our design for performance.
Consider the cases where input and output run at different rates. If a standard input produces data more slowly than standard output has the capacity to receive it, there’s no problem; output is perfectly happy to run below peak capacity. But if standard input runs faster than standard output, we have a problem. Because we allocate memory for each item we write, if those writeRequest and buffer objects are allowed to pile up, they’ll eventually exhaust our memory and take down the program.
The preferred solution to this problem is backpressure—some way for a downstream processor, like standard output, to tell standard input to slow down until it can work through its backlog. (In some circles, the term backpressure can be used to refer to the backlog itself, rather than the mechanism for dealing with it. For clarity, I’m following the terminology of the Java/Scala Reactive Streams community[44] and will use backpressure to refer to control mechanism that pushes back against excess throughput.)
One way to implement backpressure is to add a method, such as demand(n:Int), to our Pipe trait that a destination pipe would call on the pipe that feeds it to signal how many items it can accept. But that would substantially complicate our code, since the current design doesn’t require a pipe to know where its input comes from. A simpler design that could work well for a single-threaded system like this is to add an Int return value to feed(), or even a Bool, to indicate our ability to consume more items. But even a minimal design for backpressure raises more questions: we then need to actually implement our “slow-down” logic and decide whether to drop elements, buffer them, or somehow control the rate of data upstream.
An alternative approach is to rely on the rate-control mechanisms provided by the operating system, which we’ve already used in other contexts. For example, in Chapter 6, Doing I/O Right, with Event Loops, the system’s TCP connection queue allowed us to buffer incoming connections in a backlog, which can smooth out bursts, and then dump excess connections when the queue is full. Likewise, for straightforward I/O cases, in which reading is usually faster than writing, the blocking I/O functions we’ve used since Chapter 1 provide a natural limit—they prevent us from handling a new line of input until the previous one is done.
Fortunately, libuv allows us to exploit these patterns, all while maintaining a consistent, high-level API. All uv_fs_* functions have the ability to complete the underlying syscall synchronously on the main thread if we pass them null for their callback argument. In other words, we’re saying “don’t bother with the callback, just do this right away and return a value.” And with a careful design, we can create a Pipe implementation where this functionality is optional, allowing us to choose blocking or nonblocking characteristics as needed, without modifying our code otherwise.
The implementation will follow the same pattern: a case class that overrides the key Pipe trait methods, and a companion object that contains a helper to construct new pipes, as well as all the callback requests. One thing that helps us is that—unlike the file input pipe—we don’t have to chain together asynchronous calls; all of the output will be initiated by a call to feed(), so we only need to handle the sync/async switch in that one spot. With this design, the case class will look something like this:
| case class FileOutputPipe(fd:Int, serial:Int, async:Boolean) |
| extends Pipe[String,Unit] { |
| import LibUV._, LibUVConstants._ |
| import stdlib._, string._ |
| var offset = 0L |
| |
| val writeCB = if (async) { FileOutputPipe.writeCB } else null |
| |
| override def feed(input:String):Unit = { |
| val output_size = input.size |
| val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] |
| |
| val output_buffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] |
| output_buffer._1 = malloc(output_size) |
| Zone { implicit z => |
| val output_string = toCString(input) |
| strncpy(output_buffer._1, output_string, output_size) |
| } |
| output_buffer._2 = output_size |
| !req = output_buffer.asInstanceOf[Ptr[Byte]] |
| |
| uv_fs_write(EventLoop.loop,req,fd,output_buffer,1,offset,writeCB) |
| offset += output_size |
| } |
| |
| override def done():Unit = { |
| val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] |
| uv_fs_close(EventLoop.loop,req,fd,null) |
| FileOutputPipe.active_streams -= serial |
| } |
| } |
Whereas the initialization code in the companion object is mostly standard—the only difference here is that we choose whether to set the async flag to true or false:
| object FileOutputPipe { |
| import LibUV._, LibUVConstants._ |
| import stdlib._ |
| |
| var active_streams:mutable.Set[Int] = mutable.Set() |
| var serial = 0 |
| |
| def apply(path:CString, async:Boolean = true):FileOutputPipe = { |
| active_streams += serial |
| |
| stdio.printf(c"opening %s for writing..\n", path) |
| val fd = util.open(path,O_RDWR + O_CREAT,default_permissions) |
| println(s"got back fd: $fd") |
| |
| |
| val pipe = FileOutputPipe(fd,serial,async) |
| serial += 1 |
| println(s"initialized $pipe") |
| pipe |
| } |
And then the on_write callback is routine:
| val writeCB = new FSCB { |
| def apply(req:FSReq):Unit = { |
| println("write completed") |
| val resp_buffer = (!req).asInstanceOf[Ptr[Buffer]] |
| stdlib.free(resp_buffer._1) |
| stdlib.free(resp_buffer.asInstanceOf[Ptr[Byte]]) |
| stdlib.free(req.asInstanceOf[Ptr[Byte]]) |
| } |
| } |
That’s it! Now, we can plug in this FileOutputPipe at the end of a pipeline by passing to addDestination, like so:
| object FileOutputPipeExample { |
| import LibUV._, LibUVConstants._ |
| def main(args:Array[String]):Unit = { |
| println("hello!") |
| val p = SyncPipe(0) |
| val p = FilePipe(c"./data.txt") |
| |
| val q = p.map { d => |
| println(s"consumed $d") |
| d |
| }.map { d => |
| val parsed = Try { |
| d.toInt |
| } |
| println(s"parsed: $parsed") |
| parsed.toString |
| } |
| .addDestination(FileOutputPipe(c"./output.txt", false)) |
| uv_run(EventLoop.loop,UV_RUN_DEFAULT) |
| println("done") |
| } |
| } |
This should produce about the same output as our previous program, when run:
| $ ./target/scala-2.11/file_output-out |
| uv_prepare_init returned 0 |
| uv_prepare_start returned 0 |
| read 4 bytes from pipe 0 |
| consumed foo |
| parsed: Failure(java.lang.NumberFormatException: foo) |
| error: java.lang.NumberFormatException: foo |
| read 2 bytes from pipe 0 |
| consumed 1 |
| parsed: Success(1) |
| saw number 1 |
| read 2 bytes from pipe 0 |
| consumed 2 |
| parsed: Success(2) |
| saw number 2 |
| read -4095 bytes from pipe 0 |
| size < 0, closing |
| stopping dispatcher |
| done |
This looks good so far, but let’s look at the contents of the file we wrote:
| Failure(java.lang.NumberFormatException: foo) |
| Success(1) |
| Success(2) |
Looks good! With standard input and output both taken care of, we can move on to some trickier cases.