Implementing event streams

The event stream implementation, on the other hand, is the most complex in our framework. We'll tackle it gradually, implementing it and experimenting as we go.

First, let's look at our main constructor function, event-stream:

(defn event-stream 
  "Creates and returns a new event stream. You can optionally provide an existing 
  core.async channel as the source for the new stream" 
  ([] 
     (event-stream (chan))) 
  ([ch] 
     (let [multiple  (mult ch) 
           completed (atom false)] 
       (EventStream. ch multiple completed)))) 

The docstring should be sufficient to understand the public API. What might not be clear, however, is what all the constructor arguments mean. From left to right, the arguments to EventStream are as follows:

From the implementation, you can see that multiple is created from the channel backing the stream. multiple works kind of like a broadcast. Consider the following example:

  (def in (chan)) 
  (def multiple (mult in)) 
 
  (def out-1 (chan)) 
  (tap multiple out-1) 
 
  (def out-2 (chan)) 
  (tap multiple out-2) 
  (go (>! in "Single put!")) 
 
  (go (prn "Got from out-1 " (<! out-1))) 
  (go (prn "Got from out-2 " (<! out-2))) 

In the previous snippet, we created an input channel, in, and a mult of it called multiple. Then, we created two output channels, out-1 and out-2, which are both followed by a call to tap. This essentially means that whatever values are written to in will be taken by multiple and written to any channels that are tapped into it, as the following output shows:

"Got from out-1 " "Single put!" 
"Got from out-2 " "Single put!" 

This will make understanding the EventStream implementation easier.

Next, let's take a look at what a minimal implementation of the EventStream looks like. Make sure that the implementation goes before the constructor function, as described earlier:

(declare event-stream) 
 
(deftype EventStream [channel multiple completed] 
  IEventStream 
  (map [_ f] 
    (let [out (map> f (chan))] 
      (tap multiple out) 
      (event-stream out))) 
 
  (deliver [_ value] 
    (if (= value ::complete) 
      (do (reset! completed true) 
          (go (>! channel value) 
              (close! channel))) 
      (go (>! channel value)))) 
 
 
  IObservable 
  (subscribe [this f] 
    (let [out (chan)] 
      (tap multiple out) 
      (go-loop [] 
        (let [value (<! out)] 
          (when (and value (not= value ::complete)) 
            (f value) 
            (recur)))) 
      (Token. out)))) 

For now, we have chosen to implement only the map and deliver functions from the IEventStream protocol. This allows us to deliver values to the stream, as well as to transform those values.

However, this would not be very useful if we could not retrieve the values that have been delivered. This is why we also implement the subscribe function from the IObservable protocol.

In a nutshell, map needs to take a value from the input stream, apply a function to it, and send it to the newly created stream. We do this by creating an output channel that taps into the current multiple. We then use this channel to back the new event stream.

The deliver function simply puts the value into the backing channel. If the value is the namespace keyword, ::complete, we update the completed atom and close the backing channel. This ensures that the stream will not emit any other values.

Finally, we have the subscribe function. The way subscribers are notified is by using an output channel that's been tapped to backing multiple. We loop indefinitely, calling the subscribing function whenever a new value is emitted.

We finish by returning a token, which will close the output channel once disposed of, causing the go-loop to stop.

Let's make sure that all of this makes sense by experimenting with a couple of examples in the REPL:

  (def es1 (event-stream)) 
  (subscribe es1 #(prn "first event stream emitted: " %)) 
  (deliver es1 10) 
  ;; "first event stream emitted: " 10 
 
 
  (def es2 (map es1 #(* 2 %))) 
  (subscribe es2 #(prn "second event stream emitted: " %)) 
 
  (deliver es1 20) 
  ;; "first event stream emitted: " 20 
  ;; "second event stream emitted: " 40 

Excellent! We have a minimal, working implementation of our IEventStream protocol!

The next function we'll implement is filter, and it is very similar to map:

  (filter [_ pred] 
    (let [out (filter> pred (chan))] 
      (tap multiple out) 
      (event-stream out))) 

The only difference is that we use the filter> function. pred should be a Boolean function:

  (def es1 (event-stream)) 
  (def es2 (filter es1 even?)) 
  (subscribe es1 #(prn "first event stream emitted: " %)) 
  (subscribe es2 #(prn "second event stream emitted: " %)) 
 
  (deliver es1 2) 
  (deliver es1 3) 
  (deliver es1 4) 
 
  ;; "first event stream emitted: " 2 
  ;; "second event stream emitted: " 2 
  ;; "first event stream emitted: " 3 
  ;; "first event stream emitted: " 4 
  ;; "second event stream emitted: " 4 

As we can see, es2 only emits a new value if, and only if, that value is an even number.

If you are following along, typing the examples step by step, you will need to restart your REPL whenever we add new functions to any deftype definition. This is because deftype generates and compiles a Java class when evaluated. As such, simply reloading the namespace won't be enough.

Alternatively, you can use a tool such as tools.namespace, which addresses some of these REPL reloading limitations[3].

Moving down our list, we now have flatmap:

(flatmap [_ f] 
    (let [es (event-stream) 
          out (chan)] 
      (tap multiple out) 
      (go-loop [] 
        (when-let [a (<! out)] 
          (let [mb (f a)] 
            (subscribe mb (fn [b] 
                            (deliver es b))) 
            (recur)))) 
      es)) 

We've encountered this operator before, when surveying Reactive Extensions.

In the docstring, for flatmap implementation, we put the following sentence:

"Takes a function f from values in s to a new EventStream.

Returns an EventStream containing values from all underlying streams combined."

This means flatmap needs to combine all the possible event streams into a single output event stream. As before, we tap a new channel to the multiple stream, but then we loop over the output channel, applying f to each output value.

However, as we saw, f itself returns a new event stream, so we simply subscribe to it. Whenever the function registered in the subscription gets called, we deliver that value to the output event stream, effectively combining all streams into a single stream.

Consider the following example:

  (defn range-es [n] 
    (let [es (event-stream (chan n))] 
      (doseq [n (range n)] 
        (deliver es n)) 
      es)) 
 
  (def es1 (event-stream)) 
  (def es2 (flatmap es1 range-es)) 
  (subscribe es1 #(prn "first event stream emitted: " %)) 
  (subscribe es2 #(prn "second event stream emitted: " %)) 
 
  (deliver es1 2) 
  ;; "first event stream emitted: " 2 
  ;; "second event stream emitted: " 0 
  ;; "second event stream emitted: " 1 
 
  (deliver es1 3) 
  ;; "first event stream emitted: " 3 
  ;; "second event stream emitted: " 0 
  ;; "second event stream emitted: " 1 
  ;; "second event stream emitted: " 2 

We have a function, range-es, that receives a number, n, and returns an event stream that emits numbers from 0 to n. As before, we have a starting stream, es1, and a transformed stream that's been created with flatmap, es2.

We can see from the preceding output that the stream created by range-es gets flattened into es2, allowing us to receive all values by simply subscribing to it once.

This leaves us with a single function from IEventStream left to implement:

  (completed? [_] @completed) 

completed? simply returns the current value of the completed atom. We are now ready to implement behaviors.