Streams—Lazy Enumerables

In Elixir, the Enum module is greedy. This means that when you pass it a collection, it potentially consumes all the contents of that collection. It also means the result will typically be another collection. Look at the following pipeline:

enum/pipeline.exs
 [ 1, 2, 3, 4, 5 ]
 #=> [ 1, 2, 3, 4, 5 ]
 |> Enum.map(&(&1*&1))
 #=> [ 1, 4, 9, 16, 25 ]
 |> Enum.with_index
 #=> [ {1, 0}, {4, 1}, {9, 2}, {16, 3}, {25, 4} ]
 |> Enum.map(​fn​ {value, index} -> value - index ​end​)
 #=> [1, 3, 7, 13, 21]
 |> IO.inspect ​#=> [1, 3, 7, 13, 21]

The first map function takes the original list and creates a new list of its squares. with_index takes this list and returns a list of tuples. The next map then subtracts the index from the value, generating a list that gets passed to IO.inspect.

So, this pipeline generates four lists on its way to outputting the final result.

Let’s look at something different. Here’s some code that reads lines from a file and returns the longest:

enum/longest_line.exs
 IO.puts File.read!(​"​​/usr/share/dict/words"​)
  |> String.split
  |> Enum.max_by(&String.length/1)

In this case, we read the whole dictionary into memory (on my machine that’s 2.4MB), then split it into a list of words (236,000 of them) before processing it to find the longest (which happens to be formaldehydesulphoxylate).

In both of these examples, our code is suboptimal because each call to Enum is self-contained. Each call takes a collection and returns a collection.

What we really want is to process the elements in the collection as we need them. We don’t need to store intermediate results as full collections; we just need to pass the current element from function to function. And that’s what streams do.

A Stream Is a Composable Enumerator

Here’s a simple example of creating a Stream:

 iex>​ s = Stream.map [1, 3, 5, 7], &(&1 + 1)
 #Stream<[enum: [1, 3, 5, 7], funs: [#Function<46.3851/1 in Stream.map/2>] ]>

If we’d called Enum.map, we’d have seen the result [2,4,6,8] come back immediately. Instead we get back a stream value that contains a specification of what we intended.

How do we get the stream to start giving us results? Treat it as a collection and pass it to a function in the Enum module:

 iex>​ s = Stream.map [1, 3, 5, 7], &(&1 + 1)
 #Stream<[enum: [1, 3, 5, 7], funs: [#Function<46.3851/1 in Stream.map/2>] ]>
 iex>​ Enum.to_list s
 [2, 4, 6, 8]

Because streams are enumerable, you can also pass a stream to a stream function. Because of this, we say that streams are composable.

 iex>​ squares = Stream.map [1, 2, 3, 4], &(&1*&1)
 #Stream<[enum: [1, 2, 3, 4],
  funs: [#Function<32.133702391 in Stream.map/2>] ]>
 
 iex>​ plus_ones = Stream.map squares, &(&1+1)
 #Stream<[enum: [1, 2, 3, 4],
  funs: [#Function<32.133702391 in Stream.map/2>,
  #Function<32.133702391 in Stream.map/2>] ]>
 
 iex>​ odds = Stream.filter plus_ones, ​fn​ x -> rem(x,2) == 1 ​end
 #Stream<[enum: [1, 2, 3, 4],
  funs: [#Function<26.133702391 in Stream.filter/2>,
  #Function<32.133702391 in Stream.map/2>,
  #Function<32.133702391 in Stream.map/2>] ]>
 
 iex>​ Enum.to_list odds
 [5, 17]

Of course, in real life we’d have written this as

enum/stream1.exs
 [1,2,3,4]
 |> Stream.map(&(&1*&1))
 |> Stream.map(&(&1+1))
 |> Stream.filter(​fn​ x -> rem(x,2) == 1 ​end​)
 |> Enum.to_list

Note that we’re never creating intermediate lists—we’re just passing successive elements of each of the collections to the next in the chain. The Stream values shown in the previous IEx session give a hint of how this works—chained streams are represented as a list of functions, each of which is applied in turn to each element of the stream as it is processed.

Streams aren’t only for lists. More and more Elixir modules now support streams. For example, here’s our longest-word code written using streams:

enum/stream2.exs
 IO.puts File.open!(​"​​/usr/share/dict/words"​)
  |> IO.stream(​:line​)
  |> Enum.max_by(&String.length/1)

The magic here is the call to IO.stream, which converts an IO device (in this case the open file) into a stream that serves one line at a time. In fact, this is such a useful concept that there’s a shortcut:

enum/stream3.exs
 IO.puts File.stream!(​"​​/usr/share/dict/words"​) |> Enum.max_by(&String.length/1)

The good news is that there is no intermediate storage. The bad news is that it runs about two times slower than the previous version. However, consider the case where we were reading data from a remote server or from an external sensor (maybe temperature readings). Successive lines might arrive slowly, and they might go on for ever. With the Enum implementation we’d have to wait for all the lines to arrive before we started processing. With streams we can process them as they arrive.

Infinite Streams

Because streams are lazy, there’s no need for the whole collection to be available up front. For example, if I write

 iex>​ Enum.map(1..10_000_000, &(&1+1)) |> Enum.take(5)
 [2, 3, 4, 5, 6]

it takes about 8 seconds before I see the result. Elixir is creating a 10-million-element list, then taking the first five elements from it. If instead I write

 iex>​ Stream.map(1..10_000_000, &(&1+1)) |> Enum.take(5)
 [2, 3, 4, 5, 6]

the result comes back instantaneously. The take call just needs five values, which it gets from the stream. Once it has them, there’s no more processing.

In these examples the stream is bounded, but it can equally well go on forever. To do that, we’ll need to create streams based on functions.

Creating Your Own Streams

Streams are implemented solely in Elixir libraries—there is no specific runtime support. However, this doesn’t mean you want to drop down to the very lowest level and create your own streamable types. The actual implementation is complex (in the same way that string theory and dating rituals are complex). Instead, you probably want to use some helpful wrapper functions to do the heavy lifting. There are a number of these, including cycle, repeatedly, iterate, unfold, and resource. (If you needed proof that the internal implementation is tricky, consider the fact that these last two names give you almost no hint of their power.)

Let’s start with the three simplest: cycle, repeatedly, and iterate.

Stream.cycle

Stream.cycle takes an enumerable and returns an infinite stream containing that enumerable’s elements. When it gets to the end, it repeats from the beginning, indefinitely. Here’s an example that generates the rows in an HTML table with alternating green and white classes:

 iex> Stream.cycle(​~​w{ green white }) |>
 ...> Stream.zip(1..5) |>
 ...> Enum.map(​fn​ {class, value} ->
 ...> ​"​​<tr class='​​#{​class​}​​'><td>​​#{​value​}​​</td></tr>\n"​ ​end​) |>
 ...> IO.puts
 <tr class=​"​​green"​​>​<td>1</td​>​</tr>
 <tr class=​"​​white"​​>​<td>2</td​>​</tr>
 <tr class=​"​​green"​​>​<td>3</td​>​</tr>
 <tr class=​"​​white"​​>​<td>4</td​>​</tr>
 <tr class=​"​​green"​​>​<td>5</td​>​</tr>
 :ok

Stream.repeatedly

Stream.repeatedly takes a function and invokes it each time a new value is wanted.

 iex>​ Stream.repeatedly(​fn​ -> true ​end​) |> Enum.take(3)
 [true, true, true]
 iex>​ Stream.repeatedly(&​:random​.uniform/0) |> Enum.take(3)
 [0.7230402056221108, 0.94581636451987, 0.5014907142064751]

Stream.iterate

Stream.iterate(start_value, next_fun) generates an infinite stream. The first value is start_value. The next value is generated by applying next_fun to this value. This continues for as long as the stream is being used, with each value being the result of applying next_fun to the previous value.

Here are some examples:

 iex>​ Stream.iterate(0, &(&1+1)) |> Enum.take(5)
 [0, 1, 2, 3, 4]
 iex>​ Stream.iterate(2, &(&1*&1)) |> Enum.take(5)
 [2, 4, 16, 256, 65536]
 iex>​ Stream.iterate([], &[&1]) |> Enum.take(5)
 [[], [[]], [[[]]], [[[[]]]], [[[[[]]]]]]

Stream.unfold

Now we can get a little more adventurous. Stream.unfold is related to iterate, but you can be more explicit both about the values output to the stream and about the values passed to the next iteration. You supply an initial value and a function. The function uses the argument to create two values, returned as a tuple. The first is the value to be returned by this iteration of the stream, and the second is the value to be passed to the function on the next iteration of the stream. If the function returns nil, the stream terminates.

This sounds abstract, but unfold is quite useful—it is a general way of creating a potentially infinite stream of values where each value is some function of the previous state.

The key is the generating function. Its general form is

 fn state -> { stream_value, new_state } end

For example, here’s a stream of Fibonacci numbers:

 iex>​ Stream.unfold({0,1}, ​fn​ {f1,f2} -> {f1, {f2, f1+f2}} ​end​) |> Enum.take(15)
 [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377]

Here the state is a tuple containing the current and the next number in the sequence. We seed it with the initial state of {0, 1}. The value each iteration of the stream returns is the first of the state values. The new state moves one down the sequence, so an initial state of {f1,f2} becomes a new state of {f2,f1+f2}.

Stream.resource

At this point you might be wondering how streams can interact with external resources. We’ve already seen how you can turn a file’s contents into a stream of lines, but how could you implement this yourself? You’d need to open the file when the stream first starts, return successive lines, and then close the file at the end. Or maybe you want to turn a database result-set cursor into a stream of values. You’d have to execute the query when the stream starts, return each row as stream values, and close the query at the end. And that’s where Stream.resource comes in.

Stream.resource builds upon Stream.unfold. It makes two changes.

The first argument to unfold is the initial value to be passed to the iteration function. But if that value is a resource, we don’t want to open it until the stream starts delivering values, and that might not happen until long after we create the stream. To get around this, resource takes not a value, but a function that returns the value. That’s the first change.

Second, when the stream is done with the resource, we may need to close it. That’s what the third argument to Stream.resource does—it takes the final accumulator value and does whatever is needed to deallocate the resource.

Here’s an example from the library documentation:

 Stream.resource(​fn​ -> File.open!(​"​​sample"​) ​end​,
 fn​ file ->
 case​ IO.read(file, ​:line​) ​do
  data ​when​ is_binary(data) -> {[data], file}
  _ -> {​:halt​, file}
 end
 end​,
 fn​ file -> File.close(file) ​end​)

The first function opens the file when the stream becomes active, and passes it to the second function. This reads the file, line by line, returning either a line and the file as a tuple, or a :halt tuple at the end of the file. The third function closes the file.

Let’s finish with a different kind of resource: time. We’ll implement a timer that counts down the number of seconds until the start of the next minute. It uses a stream resource to do this. The allocation function returns the number of seconds left until the next minute starts. It does this each time the stream is evaluated, so we’ll get a countdown that varies depending on when it is called.

The iteration function looks at the time left. If zero, it returns {:halt, 0}; otherwise it sleeps for a second and returns the current countdown as a string, along with the decremented counter.

In this case there’s no resource deallocation, so the third function does nothing.

Here’s the code:

enum/countdown.exs
 defmodule​ Countdown ​do
 
 def​ sleep(seconds) ​do
 receive​ ​do
 after​ seconds*1000 -> nil
 end
 end
 
 def​ say(text) ​do
  spawn ​fn​ -> ​:os​.cmd(​'say #{text}'​) ​end
 end
 def​ timer ​do
  Stream.resource(
 fn​ -> ​# the number of seconds to the start of the next minute
  {_h,_m,s} = ​:erlang​.time
  60 - s - 1
 end​,
 
 fn​ ​# wait for the next second, then return its countdown
  0 ->
  {​:halt​, 0}
 
  count ->
  sleep(1)
  { [inspect(count)], count - 1 }
 end​,
 
 fn​ _ -> nil ​end​ ​# nothing to deallocate
  )
 end
 end

(The eagle-eyed among you will have noticed a function called say in the Countdown module. This executes the shell command say, which, on OS X, speaks its argument. You could substitute espeak on Linux and ptts on Windows.)

Let’s play with the code.

 $ iex countdown.exs
 iex>​ counter = Countdown.timer
 #Function<17.133702391/2 in Stream.resource/3>
 
 iex>​ printer = counter |> Stream.each(&IO.puts/1)
 #Stream[enum: #Function<17.133702391/2 in Stream.resource/3>,
  funs: [#Function<0.133702391/1 in Stream.each/2>] ]>
 
 iex>​ speaker = printer |> Stream.each(&Countdown.say/1)
 #Stream[enum: #Function<17.133702391/2 in Stream.resource/3>,
  funs: [#Function<0.13370239/1 in Stream.each/2>,
  #Function<0.133702391/1 in Stream.each/2>] ]>

So far, we’ve built a stream that creates time events, prints the countdown value, and speaks it. But there’s been no output, as we haven’t yet asked the stream for any values. Let’s do that now:

 iex>​ speaker |> Enum.take(5)
 37 ​**​ numbers are output once
 36 ​**​ per second. Even cooler, the
 35 ​**​ computer says
 34 ​**​ "thirty seven", "thirty six"…
 33
 ["37", "36", "35", "34", "33"]

Cool—we must have started it around 22 seconds into a minute, so the countdown starts at 37. Let’s use the same stream again, a few seconds later:

 iex>​ speaker |> Enum.take(3)
 29
 28
 27
 ["29", "28", "27"]

Wait some more seconds, and this time let it run to the top of the minute:

 iex>​ speaker |> Enum.to_list
 6
 5
 4
 3
 2
 1
 ["6", "5", "4", "3","2", "1"]

This is clearly not great code, as it fails to correct the sleep time for any delays introduced by our code. But it illustrates a very cool point. Lazy streams let you deal with resources that are asynchronous to your code, and the fact that they are initialized every time they are used means they’re effectively side effect–free. Every time we pipe our stream to an Enum function, we get a fresh set of values, computed at that time.

Streams in Practice

In the same way that functional programming requires you to look at problems in a new way, streams ask you to look at iteration and collections afresh. Not every situation where you’re iterating requires a stream. But consider using a stream when you want to defer processing until you need the data, and when you need to deal with large numbers of things without necessarily generating them all at once.