Comparing Core Async and Rx by Example

Last week I saw a core async webinar where David Nolen of Cognitect presented the use of core async with its channels and go blocks in a frontend application using ClojureScript. While watching it I got flashbacks to a problem I faced a while ago when I created some exercises (Functional Reactive Programming using the RxJava library) for my company that I never really managed to solve exactly the way I wanted to. So I thought I’d give ClojureScript and core async (and then later also RxJS) a go to tackle the problem. For the curious the code for this blog can be found here.

The Problem

Given that we have two buttons, A and B, we want to print a message if a “secret combination” (ABBABA) is clicked within 5 seconds. To make things a bit harder the following must also be true:

  • The message should be printed immediately when the secret combination is clicked. We should not wait until the end of the 5 second interval before the message is printed.
  • As soon as the secret combination is fulfilled (within 5 seconds) the message should be displayed regardless of what has been clicked before. For example a combination like “BBABBABA” is ok as long as “ABBABA” is clicked within 5 seconds.

The core async way

I had some experience with Clojure and core async prior to the webinar but not extensively. Having seen David’s examples was enough to get me started on the problem. First off all I borrowed some of the utility functions from David’s presentation:

There’s also a webpage that contains some simple HTML:

With these functions we can start solving the problem. First we create two channels, one for each button:

What we do here is to create a channel for each button mapped to the CLICK event. These events will be written to the channel that we supply as the last argument. The channel has a buffer size of 1 (since we want each click to generate an event) and a transducer. Transducers will be a part of Clojure 1.7 and allows us to perform transformation logic on the data inside the channel. Since we’re only interested in which button that was clicked we use a transducer that simply maps each click to keyword :a or :b respectively.

Next we define the combination max time, the secret combination itself and a partial function called set-html!:

Having set-html! as a partial function with “ex1-card” as first argument allows us to avoid having to provide “ex1-card” for each call to the original set-html! function every time we use it.

Next we we enter our go block:

go is a Clojure macro that takes the body and enable usage of channels inside it. Under the hood the body turns into a state machine. When reaching a blocking channel operation the thread will be released and the state machine parked. This gives us the illusion of writing sequential code in an asynchronous environment.

Inside our go block we’ve defined a loop that maintains a vector of our correct clicks as well as a timeout channel. The timeout channel will produce an event every 5 seconds (as defined by combination-max-time). Next we define some other values:

We make use of Clojure’s deconstruction syntax to get the value and channel of either a, b or timeout (which ever produces an event first). For this we use the alts! function and pass in a vector of our channels. Clojure will “block” at this line (4) until data is available in any of these channels. Next we append the val received from the channel to our current list of correct clicks (correct-clicks) and assign it to clicks. Now we have everything we need to start making decisions:

The decisions are made inside a cond statement. First (line 12) we check if it was the timeout channel that produced the event and if so we use the set-html! function to write a message to the user that he should try again. We recur by resetting the correct-clicks vector and a new channel with 5 second timeout. On line 13 we check for our final result, is the secret-combination fulfilled? If so we notify the user his success and recur with the default values. Next (line 14) we allow for the case where the timer should be reset when the first correct button is clicked in our secret combination. This gives us 5 new seconds to finish the combination every time we press the correct button that starts the sequence. We also handle the intermediary case (line 15) when we’ve clicked the correct combination of buttons up to the given point. Here we just recur with the current set of clicks and with the current timeout. At last (line 17) we also handle the case when a button click doesn’t match the secret combination. If so we reset the correct-clicks to empty but retain the timeout. And that’s it! The full code is shown below:

The RxJS way

After I managed to solve the problem in ClojureScript I wanted to go back to see if I could find a solution with Rx. Instead of using RxJava I tried the RxJS library for javascript. The webpage looks identical to the previous example except for the div ids. I ported my initial (faulty) solution from RxJava to RxJS without much effort:

Similarly to what we did in the core async example I created one observable for each button and map each click to the button name. The button observables are then merged into a single observable sequence. Next I used the bufferWithTimeOrCount operator to buffer at most 6 button clicks (the length of the secret combination) for 5 seconds. The returned buffer is a “list” so we reduce it to a string which we compare with the secretCombination. The problem with this approach is that the timeout is not reset when we press the A button for the first time in the sequence. This can lead to a timeout (“Too slow or wrong!” being printed) even if we press the right combination if the combination is clicked between two time slots. After a lot of experimenting I turned to stackoverflow where I received help from André Staltz. He proposed the following solution which works fine:

Let’s focus on the evaluationStream first. Here we make use of the throttle operation. In Andrés words throttle waits for 5 seconds of event silence and emits the last event before that silence. In other words, it’s similar to delay, except it resets the inner timer when a new event is seen on the source Observable. We need to reset the scan’s concatenation (1), so we just map the same throttled Observable to “reset” flags (3), which the scan will interpret as clearing the accumulator (acc). scan is similar to reduce but it returns each intermediate result as opposed to the entire aggregation as a single element which is what we want in this case. At line (18) we make sure that we only return a string that has the same length as the secret combination and then we essentially just checks whether the clicked combination it’s equal to the secret combination or not (line 27).

Now that we have the evaluation stream we can attach two subscribers to it that checks for timeout (wrongStream) and a correct combination (correctStream). The reason why we need two different streams is that we want to print “Combination unlocked!” as soon as the combination is unlocked (without having to wait for a timeout).

Update 2014-09-18: A couple of people from the RxJS community helped out and improved the solution presented here. You can see their results and discussions here.

That’s it!

Summary

Solving the problem with core async as opposed to RxJS was a lot easier for me. I spent around half an hour or so getting everything working the way I wanted to using core async. I think I probably must have spent 4 hours or so on the Rx implementation before turning to stackoverflow for help. For this particular example I definitely felt that I needed to learn fewer operations and concepts to solve the problem using core async. I found this exercise to be very rewarding and it was also nice to be able to compare the two different styles in the end. If you’re interested in the code it’s available here.

6 Comments

  1. Niall

    Perhaps it comes down to a learning curve, and Rx definitely has that. But generally I find if the Rx code is getting as long, stateful (state accumulators) and if statement-ey as the above then there is probably a much simpler way. I had a look through the alternative implementations you linked to, and it had taken me a couple of minutes to come up with the Timestamped, buffered version.

    I think the difference in the approach (and difficulty) is that the above solution is trying to build up all the custom state inside its own values (eg what have they typed so far, does it match the sequence, flag the timeout with a special string value, etc. The much easier approach is to actually define what you’re trying to capture here – 6 letters typed within a potentially longer set of letters, that match a certain string, within 5 seconds. Straightaway from that, you’re trying to take 6 letter chunks out of a stream (sounds like buffer, you could scan it up yourself, but why bother), and you have a timeout, or you just compare the timestamp of the first or last letters.

    Let Rx do the state management – that’s what it’s good at :)

  2. Insurgent

    Have you tried the JS-CSP version of Core Async?

    https://github.com/ubolonton/js-csp

  3. Sorry for the late reply out of the blue, just needed to my thoughts on how to solve this down in writing.

    timestamps = buttonObservable.map { (button) -> (button, generateTimestamp()) }

    secretCodes = timestamps.slidingWindow(6).filter { (buttonSeq) -> buttonSeq.last.timestamp – buttonSeq.first.timestamp < 5s }

    Done in two lines. I know slidingWindow doesn’t exist in Rx and this requires the observable to be hot for the timestamps to be correct but you get the idea. Maybe I’m missing something?

Leave a Reply