If you recall, the IBehavior protocol has a single function, called sample, whose docstring states: Turns this Behavior into an EventStream from the sampled values at the given interval.
To implement sample, we will first create a useful helper function that we will call from-interval:
(defn from-interval "Creates and returns a new event stream which emits values at the given interval. If no other arguments are given, the values start at 0 and increment by one at each delivery. If given seed and succ it emits seed and applies succ to seed to get the next value. It then applies succ to the previous result and so on." ([msecs] (from-interval msecs 0 inc)) ([msecs seed succ] (let [es (event-stream)] (go-loop [timeout-ch (timeout msecs) value seed] (when-not (completed? es) (<! timeout-ch)
(timeout msecs) (deliver es value) (recur (succ value)))) es)))
The docstring function should be clear enough at this stage, but we would like to ensure that we understand its behavior correctly by trying it at the REPL:
(def es1 (from-interval 500)) (def es1-token (subscribe es1 #(prn "Got: " %))) ;; "Got: " 0 ;; "Got: " 1 ;; "Got: " 2 ;; "Got: " 3 ;; ... (dispose es1-token)
As expected, es1 emits integers starting at zero at 500 millisecond intervals. By default, it would emit numbers indefinitely; therefore, we keep a reference to the token that's returned by calling subscribe.
This way, we can dispose of it whenever we're done, causing es-1 to complete and stop emitting items.
Next, we can finally implement the Behavior type:
(deftype Behavior [f] IBehavior (sample [_ interval] (from-interval interval (f) (fn [& args] (f)))) IDeref (#?(:clj deref :cljs -deref) [_] (f))) (defmacro behavior [& body] `(Behavior. #(do ~@body)))
A behavior is created by passing it a function. You can think of this function as a generator that's responsible for generating the next value in this event stream.
This generator function will be called whenever we (1) deref the Behavior or (2) at the interval given to sample.
The behavior macro is there for convenience and allows us to create a new Behavior without wrapping the body in a function ourselves:
(def time-behavior (behavior (System/nanoTime))) @time-behavior ;; 201003153977194 @time-behavior ;; 201005133457949
In the preceding example, we defined time-behavior as something that always contains the current system time. We can, then, turn this behavior into a stream of discrete events by using the sample function:
(def time-stream (sample time-behavior 1500)) (def token (subscribe time-stream #(prn "Time is " %))) ;; "Time is " 201668521217402 ;; "Time is " 201670030219351 ;; ... (dispose token)
Congratulations! We have a working, minimal CES framework using core.async!
We didn't prove that it works with ClojureScript, however, which was one of the main requirements early on. That's okay. We will be tackling that soon by developing a simple ClojureScript application that makes use of our new framework.
To do this, we need to deploy the framework to our local Maven repository. From the project root, type the following lein command:
$ lein install Created respondent/target/respondent-0.1.0-SNAPSHOT.jar Wrote respondent/pom.xml Installed jar and pom into local rep.