With input and output taken care of, we can fill in the gaps with more stream processors. The MapPipe we already wrote is pretty generic and modular, and in theory, the signature O => T can do a huge range of tasks, but it’s less ergonomic for functions that produce lists, options, or futures, all of which are better handled by specialized implementations. We can also work with custom stateful components and look at a few ways to get values out of them.
One of the most useful would be a component to take a value of type I, apply a function that produces an Option[O], and only pass the inner O on to destination pipes if the Option isn’t None. We can implement it with just a few lines of code, like this:
| case class OptionPipe[T,U](f:T => Option[U]) extends Pipe[T,U] { |
| override def feed(input:T):Unit = { |
| val output = f(input) |
| for (h <- handlers; |
| o <- output) { |
| h.feed(o) |
| } |
| } |
| } |
And although we could use addDestination to add it to a chain of pipes, it’s more convenient if we add a method to the Pipe trait itself to construct it for us:
| def mapOption[V](g:U => Option[V]):Pipe[U,V] = { |
| addDestination(OptionPipe(g)) |
| } |
While we’re at it, we can construct a .map method to do the same thing with our MapPipe:
| def map[V](g:U => V):Pipe[U,V] = { |
| addDestination(SyncPipe(g)) |
| } |
We can even construct a .filter method, which takes any true/false function with the type signature I -> Bool and removes elements from the stream that return false:
| def filter(f:T => Boolean):Pipe[T] = { |
| addDestination(mapOption { t => |
| f(t) match { |
| case true => Some(t) |
| case false => None |
| } |
| } |
| } |
This technique allows us to chain simple functions, one after the other, without accumulating increasingly complex types as we proceed, as in this example:
| SyncPipe(0) |
| .map { d => |
| println(s"consumed $d") |
| d |
| }.map { d => |
| val parsed = Try { |
| d.toInt |
| } |
| }.filter { |
| case Success(i) => |
| println(s"saw number $i") |
| true |
| case Failure(f) => |
| println(s"error: $f") |
| false |
| } |
| // ... |
But that’s just the start.
Another extremely useful family of functions are those that take a value of type I and output a Seq[O]—for example, methods like String.split()—and output each item to the destination individually. Not only does this handle functions that transform one output into many, but it can also critically transform one value into nothing at all—meaning it can actually provide filtering functionality as well.
Much like with Option, an empty Seq() produces no output:
| case class ConcatPipe[T,U](f:T => Seq[U]) extends Pipe[T,U] { |
| override def feed(input:T):Unit = { |
| val output = f(input) |
| for (h <- handlers; |
| o <- output) { |
| h.feed(o) |
| } |
| } |
| } |
And again, we wrap it in the Pipe trait for convenience:
| def mapConcat[V](g:U => Seq[V]):Pipe[U,V] = { |
| addDestination(ConcatPipe(g)) |
| } |
This allows us to chain together many more kinds of processes without accumulating deeply nested sequence types. Even if we’ve repeatedly split large buffers of text into smaller pieces, we can still process individual chunks one at a time downstream.
So far, everything we’ve added to the chain has been a synchronous transformation. But what if we try to add asynchronous functions, of the signature I => Function[O], to our chain? It turns out that they aren’t much harder to implement than the handlers for Option and Seq:
| case class AsyncPipe[T,U](f:T => Future[U]) |
| (implicit ec:ExecutionContext) extends Pipe[T,U] { |
| |
| override def feed(input:T):Unit = { |
| f(input).map { o => |
| for (h <- handlers) { |
| h.feed(o) |
| } |
| } |
| } |
| } |
And we can wrap it as .mapAsync for all pipe instances:
| def mapAsync[V](g:U => Future[V])(implicit ec:ExecutionContext):Pipe[U,V] = { |
| addDestination(AsyncPipe(g)) |
| } |
This code comes with the same concerns and caveats about backpressure that our output pipes have; without any kind of throttling mechanism we could easily overwhelm our system with a fast, unbounded input. But for limited inputs, like command-line input, this is incredibly useful. It would let us, for example, add HTTP client requests to a pipeline; supposing we have a Pipe[String], that contains URLs to fetch, we could then just do this:
| val p:Pipe[String] = ??? |
| p.mapAsync { url => |
| Curl.get(url) |
| }.map { response => |
| println(s"got back result: $response") |
| |
| } |
Finally, it’s time to add state to our streaming processors. We’ll work through a few different use cases and implementation techniques before we arrive at the most general and powerful cases.
To start, it would be nice to have a simple counter. Suppose we want to count the number of lines in a file. If we already have a Pipe[String] containing each line, we could do something like this:
| val p:Pipe[String,String] = ??? |
| var counter = 0 |
| p.map { i => |
| counter += 1 |
| i |
| } |
| // ... |
| uv_run(EventLoop.loop,UV_RUN_DEFAULT) |
| println(s"saw $counter elements") |
This is reasonably idiomatic Scala, and easy enough to follow, but in most Scala streaming libraries, it’s strongly discouraged or forbidden. Due to some particularities of the JVM’s memory model, sharing mutable state across threads can result in a wide variety of unintended behaviors. However, in our single-threaded, event-loop runtime, this is perfectly safe, just like using a var in a for loop! But if we want to, we can still package this up as a self-contained Pipe class:
| case class CounterSink[T]() extends Pipe[T,Nothing] { |
| var counter = 0 |
| override def feed(input:T) = { |
| counter += 1 |
| } |
| } |
We can then instantiate it, and extract the value at the end, like so:
| val p:Pipe[String] = ??? |
| val c = p.addDestination(Counter()) |
| uv_run(EventLoop.loop,UV_RUN_DEFAULT) |
| println(s"saw ${c.counter} elements") |
With that basic pattern in place, we can treat some more complex cases. Perhaps the most relevant one is tokenization. This is what we would use to take the buffer-sized strings that we get from our STDIN pipe, and transform them into proper line-terminated strings. One could make a rough attempt at implementing it like so:
| val p:Pipe[String] = ??? |
| p.mapConcat { content => |
| content.split("\n") |
| }.mapConcat { line => |
| line.split(" ") |
| }.map { word => |
| println(s"saw word: ${word}") |
| } |
| uv_run(EventLoop.loop,UV_RUN_DEFAULT) |
| println(s"saw ${c.counter} elements") |
But this has a serious flaw. This code will badly malfunction if a buffer boundary doesn’t fall precisely on a line-ending character, which is going to be the case almost all of the time. To implement this properly, we need to maintain a buffer containing any “leftover” string content so that we can wait for the next line-ending character to come through. We also need special handling of the end of a stream so that we remember to emit any remaining contents of the buffer before we signal done() to downstream consumers.
| case class Tokenizer(separator:String) extends Pipe[String,String] { |
| var buffer = " " |
| |
| def scan(input:String):Seq[String] = { |
| println(s"scanning: '$input'") |
| buffer = buffer + input |
| var o:Seq[String] = Seq() |
| while (buffer.contains(separator)) { |
| val space_position = buffer.indexOf(separator) |
| val word = buffer.substring(0,space_position) |
| o = o :+ word |
| buffer = buffer.substring(space_position + 1) |
| } |
| o |
| |
| } |
| override def feed(input:String):Unit = { |
| for (h <- handlers; |
| word <- scan(input)) { |
| h.feed(word) |
| } |
| } |
| override def done():Unit = { |
| println(s"done! current buffer: $buffer") |
| for (h <- handlers) { |
| h.feed(buffer) |
| h.done() |
| } |
| } |
| } |
We could even generalize this further in a more functional style, by accepting a fold function and an initial value in the style of Scala’s powerful foldLeft method:
| case class FoldPipe[I,O](init:O)(f:(O,I) => O) extends Pipe[I,O] { |
| var accum = init |
| |
| override def feed(input:I):Unit = { |
| accum = f(accum,input) |
| for (h <- handlers) { |
| h.feed(accum) |
| } |
| } |
| |
| override def done():Unit = { |
| for (h <- handlers) { |
| h.done() |
| } |
| } |
| } |
In some ways, however, the more direct implementation style may be preferable; the subtleties of how to handle state, and what to do with accumulated state in done(), tend to either resist easy generalization, or produce a large number of slight variations with confusing names. So for now, we’ll stick with our tokenizers.
You may also observed how similar the tokenization-buffer problem is to the problem of HTTP requests that took up more than one buffer in our async HTTP server, back in Chapter 6, Doing I/O Right, with Event Loops. Indeed, treating an HTTP request as a pipe-like stream object will point a way forward as we come closer to the end of this book.