Stream Processors

With input and output taken care of, we can fill in the gaps with more stream processors. The MapPipe we already wrote is pretty generic and modular, and in theory, the signature O => T can do a huge range of tasks, but it’s less ergonomic for functions that produce lists, options, or futures, all of which are better handled by specialized implementations. We can also work with custom stateful components and look at a few ways to get values out of them.

One of the most useful would be a component to take a value of type I, apply a function that produces an Option[O], and only pass the inner O on to destination pipes if the Option isn’t None. We can implement it with just a few lines of code, like this:

LibUVPipes/file_pipe/main.scala
 case​ ​class​ OptionPipe[​T​,​U​](f​:​​T​ => Option[​U​]) ​extends​ Pipe[​T​,​U​] {
 override​ ​def​ feed(input​:​​T​)​:​​Unit​ = {
 val​ output ​=​ f(input)
 for​ (h ​<-​ handlers;
  o ​<-​ output) {
  h.feed(o)
  }
  }
 }

And although we could use addDestination to add it to a chain of pipes, it’s more convenient if we add a method to the Pipe trait itself to construct it for us:

LibUVPipes/file_pipe/main.scala
 def​ mapOption[​V​](g​:​​U​ => Option[​V​])​:​​Pipe​[​U​,​V​] ​=​ {
  addDestination(OptionPipe(g))
 }

While we’re at it, we can construct a .map method to do the same thing with our MapPipe:

LibUVPipes/file_pipe_out/main.scala
 def​ map[​V​](g​:​​U​ => V)​:​​Pipe​[​U​,​V​] ​=​ {
  addDestination(SyncPipe(g))
 }

We can even construct a .filter method, which takes any true/false function with the type signature I -> Bool and removes elements from the stream that return false:

LibUVPipes/examples.scala
 def​ filter(f​:​​T​ => Boolean)​:​​Pipe​[​T​] ​=​ {
  addDestination(mapOption { t ​=>
  f(t) ​match​ {
 case​ ​true​ ​=>​ Some(t)
 case​ ​false​ ​=>​ None
  }
  }
 }

This technique allows us to chain simple functions, one after the other, without accumulating increasingly complex types as we proceed, as in this example:

LibUVPipes/examples.scala
 SyncPipe(0)
 .map { d ​=>
  println(s​"consumed $d"​)
  d
 }.map { d ​=>
 val​ parsed ​=​ Try {
  d.toInt
  }
 }.filter {
 case​ Success(i) ​=>
  println(s​"saw number $i"​)
 true
 case​ Failure(f) ​=>
  println(s​"error: $f"​)
 false
 }
 // ...

But that’s just the start.

MapConcat

Another extremely useful family of functions are those that take a value of type I and output a Seq[O]—for example, methods like String.split()—and output each item to the destination individually. Not only does this handle functions that transform one output into many, but it can also critically transform one value into nothing at all—meaning it can actually provide filtering functionality as well.

Much like with Option, an empty Seq() produces no output:

LibUVPipes/file_pipe/main.scala
 case​ ​class​ ConcatPipe[​T​,​U​](f​:​​T​ => Seq[​U​]) ​extends​ Pipe[​T​,​U​] {
 override​ ​def​ feed(input​:​​T​)​:​​Unit​ = {
 val​ output ​=​ f(input)
 for​ (h ​<-​ handlers;
  o ​<-​ output) {
  h.feed(o)
  }
  }
 }

And again, we wrap it in the Pipe trait for convenience:

LibUVPipes/file_pipe/main.scala
 def​ mapConcat[​V​](g​:​​U​ => Seq[​V​])​:​​Pipe​[​U​,​V​] ​=​ {
  addDestination(ConcatPipe(g))
 }

This allows us to chain together many more kinds of processes without accumulating deeply nested sequence types. Even if we’ve repeatedly split large buffers of text into smaller pieces, we can still process individual chunks one at a time downstream.

Asynchronous Transformations

So far, everything we’ve added to the chain has been a synchronous transformation. But what if we try to add asynchronous functions, of the signature I => Function[O], to our chain? It turns out that they aren’t much harder to implement than the handlers for Option and Seq:

LibUVPipes/file_pipe/main.scala
 case​ ​class​ AsyncPipe[​T​,​U​](f​:​​T​ => Future[​U​])
  (​implicit​ ec​:​​ExecutionContext​) ​extends​ Pipe[​T​,​U​] {
 
 override​ ​def​ feed(input​:​​T​)​:​​Unit​ = {
  f(input).map { o ​=>
 for​ (h ​<-​ handlers) {
  h.feed(o)
  }
  }
  }
 }

And we can wrap it as .mapAsync for all pipe instances:

LibUVPipes/file_pipe/main.scala
 def​ mapAsync[​V​](g​:​​U​ => Future[​V​])(​implicit​ ec​:​​ExecutionContext​)​:​​Pipe​[​U​,​V​] ​=​ {
  addDestination(AsyncPipe(g))
 }

This code comes with the same concerns and caveats about backpressure that our output pipes have; without any kind of throttling mechanism we could easily overwhelm our system with a fast, unbounded input. But for limited inputs, like command-line input, this is incredibly useful. It would let us, for example, add HTTP client requests to a pipeline; supposing we have a Pipe[String], that contains URLs to fetch, we could then just do this:

LibUVPipes/examples.scala
 val​ p​:​​Pipe​[​String​] ​=​ ???
 p.mapAsync { url ​=>
  Curl.get(url)
 }.map { response ​=>
  println(s​"got back result: $response"​)
 
 }

Stateful Processors

Finally, it’s time to add state to our streaming processors. We’ll work through a few different use cases and implementation techniques before we arrive at the most general and powerful cases.

To start, it would be nice to have a simple counter. Suppose we want to count the number of lines in a file. If we already have a Pipe[String] containing each line, we could do something like this:

LibUVPipes/examples.scala
 val​ p​:​​Pipe​[​String​,​String​] ​=​ ???
 var​ counter ​=​ 0
 p.map { i ​=>
  counter += 1
  i
 }
 // ...
 uv_run(EventLoop.loop,UV_RUN_DEFAULT)
 println(s​"saw $counter elements"​)

This is reasonably idiomatic Scala, and easy enough to follow, but in most Scala streaming libraries, it’s strongly discouraged or forbidden. Due to some particularities of the JVM’s memory model, sharing mutable state across threads can result in a wide variety of unintended behaviors. However, in our single-threaded, event-loop runtime, this is perfectly safe, just like using a var in a for loop! But if we want to, we can still package this up as a self-contained Pipe class:

LibUVPipes/file_pipe/main.scala
 case​ ​class​ CounterSink[​T​]() ​extends​ Pipe[​T​,​Nothing​] {
 var​ counter ​=​ 0
 override​ ​def​ feed(input​:​​T​) ​=​ {
  counter += 1
  }
 }

We can then instantiate it, and extract the value at the end, like so:

LibUVPipes/examples.scala
 val​ p​:​​Pipe​[​String​] ​=​ ???
 val​ c ​=​ p.addDestination(Counter())
 uv_run(EventLoop.loop,UV_RUN_DEFAULT)
 println(s​"saw ${c.counter} elements"​)

With that basic pattern in place, we can treat some more complex cases. Perhaps the most relevant one is tokenization. This is what we would use to take the buffer-sized strings that we get from our STDIN pipe, and transform them into proper line-terminated strings. One could make a rough attempt at implementing it like so:

LibUVPipes/examples.scala
 val​ p​:​​Pipe​[​String​] ​=​ ???
 p.mapConcat { content ​=>
  content.split(​"\n"​)
 }.mapConcat { line ​=>
  line.split(​" "​)
 }.map { word ​=>
  println(s​"saw word: ${word}"​)
 }
 uv_run(EventLoop.loop,UV_RUN_DEFAULT)
 println(s​"saw ${c.counter} elements"​)

But this has a serious flaw. This code will badly malfunction if a buffer boundary doesn’t fall precisely on a line-ending character, which is going to be the case almost all of the time. To implement this properly, we need to maintain a buffer containing any “leftover” string content so that we can wait for the next line-ending character to come through. We also need special handling of the end of a stream so that we remember to emit any remaining contents of the buffer before we signal done() to downstream consumers.

LibUVPipes/file_pipe/main.scala
 case​ ​class​ Tokenizer(separator​:​​String​) ​extends​ Pipe[​String​,​String​] {
 var​ buffer ​=​ ​" "
 
 def​ scan(input​:​​String​)​:​​Seq​[​String​] ​=​ {
  println(s​"scanning: '$input'"​)
  buffer ​=​ buffer + input
 var​ o​:​​Seq​[​String​] ​=​ Seq()
 while​ (buffer.contains(separator)) {
 val​ space_position ​=​ buffer.indexOf(separator)
 val​ word ​=​ buffer.substring(0,space_position)
  o ​=​ o :+ word
  buffer ​=​ buffer.substring(space_position + 1)
  }
  o
 
  }
 override​ ​def​ feed(input​:​​String​)​:​​Unit​ = {
 for​ (h ​<-​ handlers;
  word ​<-​ scan(input)) {
  h.feed(word)
  }
  }
 override​ ​def​ done()​:​​Unit​ = {
  println(s​"done! current buffer: $buffer"​)
 for​ (h ​<-​ handlers) {
  h.feed(buffer)
  h.done()
  }
  }
 }

We could even generalize this further in a more functional style, by accepting a fold function and an initial value in the style of Scala’s powerful foldLeft method:

LibUVPipes/file_pipe/main.scala
 case​ ​class​ FoldPipe[​I​,​O​](init​:​​O​)(f​:​(​O​,​I​) ​=>​ O) ​extends​ Pipe[​I​,​O​] {
 var​ accum ​=​ init
 
 override​ ​def​ feed(input​:​​I​)​:​​Unit​ = {
  accum ​=​ f(accum,input)
 for​ (h ​<-​ handlers) {
  h.feed(accum)
  }
  }
 
 override​ ​def​ done()​:​​Unit​ = {
 for​ (h ​<-​ handlers) {
  h.done()
  }
  }
 }

In some ways, however, the more direct implementation style may be preferable; the subtleties of how to handle state, and what to do with accumulated state in done(), tend to either resist easy generalization, or produce a large number of slight variations with confusing names. So for now, we’ll stick with our tokenizers.

You may also observed how similar the tokenization-buffer problem is to the problem of HTTP requests that took up more than one buffer in our async HTTP server, back in Chapter 6, Doing I/O Right, with Event Loops. Indeed, treating an HTTP request as a pipe-like stream object will point a way forward as we come closer to the end of this book.