Creating your very own RFP-library

NOTE

What I will be showing in this post is not a fully-fledged RFP-library. There is quite a few shortcuts that have been made, heck the code will not even deal with exceptions. Rather than using this code for anything real, I’d recommend using it as a way to get an understanding of how things works in the background.

I would however recommend that you use a good active RFP-library such as RxJS or Bacon.js if possible. However, on the odd chance that the project that you work with will not accept that you use a decent library; I hope you will find enough inspiration in this post to make it possible to deal with event based code in some way that keeps you sane.

So what is this reactive stuff I have been hearing about?

Think functional code over asynchronous data streams. So what is a stream in this context? Well, let us define a stream as a set of data that has a beginning an end and might have some data in between.

| – 1 – 5 – – – 2 – |

In this flow the pipes represents the start and end and the numbers represents data over time.

Now we would like to filter this data and send a (semi)processed stream to some other service that doesn’t care where and how the data was created.
For example, if we wanted to take the above stream and get a stream that looks like the following

| – 1 – – – – – 2 – |

We might write code similar to

var filteredObservable=observable.filter(function(data){return data<5;})

Note that in most of the examples I will be using “fat arrows” from the ES6 syntax to make the code slimmer and more readable. The code above would look like this in ES6:

var filteredObservable=observable.filter(data=> data<4})

This code will work in your browsers console (as long as you use the latest and greatest). For chrome you need to go to chrome://flags and activate experimental javascripts and in IE you… well you need to run Windows 10 and go to about:flags in IE to activate experimental features. Or you could head to modern.ie and use the RemoteApp for IE.vnext. If you would like to be able to use ES6 code today without telling your users to activate experimental code you could use a transpiler such as traceur that compiles ES6 code to ES5.

The finish line

If we were to make an attempt on creating our own observable object that allows us to write the following code:

Observable.listenFor('click',document.body)
    .map(ev => ev.x) 
    .zip(Observable.interval(1000).map(_,i=>i),(d1,d2)=>d1 *d2)
    .filter((_,index)=>index%2===0)
    .take(2)
    .subscribe(d=>console.log(d))

We write two x positions of the users clicks with some slightly weird looking additions. I chose this odd looking example mainly because it allows me to write some of the functions that I usually use when using Rx.
With that said, let us create each part on their own and see if they fit together the way we want them to.

Observable.listenFor

For this we need to add the static method listenFor and the method subscribe (for validation) to our observable.

function Observable(subscribe) {
    this.subscribe = subscribe;
}

Observable.listenFor = function (event, element) {
    return new Observable(function (next) {
        function callback(ev) {
            next(ev);
        }

        element.addEventListener(event, callback);
        return  () => element.removeEventListener(event, callback);
    });
};

An observable is simply an object that has a method subscribe that takes a callback. In the creation method listenFor we define how the subscription is set up(addEventListener) and torn down(removeEventListener). As a return value we return a handle to whoever called subscribe so that they can tear down the subscription when they are done with it.

If we copy paste the above code into the console of your browser, then the following code will also work.

var d=Observable.listenFor('click',document.body)
    .subscribe(ev=>console.log(ev))

Clicking anywhere on the page now logs the clickevents. Sweet! Almost done, if we call the disposable handle we should stop seeing events in the console.

d()

yup, that works. Let’s add some of the methods in between listen and subscribe.

observable.map

For each x give me y. This should be easy enough to create. We do not want to modify the observable that we use this method on, instead we want to return a new observable. This allows us to reuse the previous observable without unexpected side effects.

Observable.prototype.map = function (mapper) {
    var that = this;
    return new Observable(
        function (onNext) {
            var index = 0;
            return that.subscribe(val=>onNext(mapper(val, index++)));
        });
};

Just as in the previous code we create a new observable with a subscribe method. What we need to do now however is to apply our logic in between the observer and the underlying observable. So we call the underlying observable subscribe method with our own onNext so we can remap whatever values we might receive before sending them on.

Apart from the data that the underlying observable sends us we also gives the mapper the index of the data.  In order to have each subscription get its correct state(the index) we need to scope this to the subscribe method as you can see in the example above.

With that implemented we can do one more step

var d=Observable.listenFor('click',document.body)
    .map(ev=>ev.x)
    .subscribe(val=>console.log(val))

Observable.interval

Time to create a new observable creator, this time instead of listening for events on DOM-elements we simply want it to trigger every x milliseconds. In other words, we want an observable over setInterval.

Observable.interval = function (interval) {
    return new Observable(function (onNext) {
        var handle = setInterval(onNext, interval);
        return ()=> clearInterval(handle);
    });
};

Not much to say about that, setInterval/clearInterval matches our requirements pretty darn well.

observable.filter

In this method our goal is to strip away some of the data from the underlying observable. We do that by simply not call the onNext method in our subscribe method.

Observable.prototype.filter = function (predicate) {
    var that = this;
    return new Observable(function (onNext) {
        return that.subscribe(function (val) {
            if (predicate(val))
                onNext(val);
        });
    });
};

Pretty similar to the other methods, moving on.

Usage

var d=Observable.listenFor('click',document.body)
    .map(ev=>ev.x)
    .filter(x=>x>window.width/2)
    .subscribe(val=>console.log(val))

observable.zip

This method is slightly more complex that the previous methods. We want to send data whenever we gotten data from both observables. A bit of queueing is in order.

Observable.prototype.zip = function (obs, joiner) {
    var sources = [this, obs];

    function enqueueFactory(queues, index, onNext) {
        return function (value) {
            queues[index].push(value);
            if (queues.every((q)=>q.length > 0))
                onNext(joiner.apply(undefined, queues.map(q=>q.shift())));
        };
    }

    return new Observable(function (onNext) {
        var queues = sources.map(()=>[]);

        return sources.map((o, i)=>o.subscribe(enqueueFactory(queues, i, onNext)))
            .reduce((d, s) =>function () { d(); s(); });
    });
};

Might look slightly overwhelming at first glance, but bear with me. What we are doing is simply adding one queue per observable(always two of them in this code) then when we enqueue one of the queues; we check if we have a value in all queues. When we got values in all queues we call joiner with the first value of each queue. The joiner line could be rewritten with ES6 syntax using the spread operator

onNext(joiner.apply(undefined,queues.map(q=>q.shift())));
onNext(joiner(...queues.map(q=>q.shift())));

The reason why I didn’t use this is due to the fact that the spread operator is yet to be(at the date of this post) implemented in chrome. The code above works fine in Firefox/IE11 though.

Apart from that little odd looking snippet there is also the joining of the dispose functions. For that we use a small chain(map -> reduce). The map gives us the disposable handle for each subscription, then the reduce method creates a sum  of those functions by returning a new function that disposes the previous sum and the current value.

Usage

var oneSecIntervalObs=Observable.interval(1000).map((_,i)=>i);
var d=Observable.listenFor('click',document.body)
    .map(ev=>ev.x)
    .zip(oneSecIntervalObs,(x,i)=>x*i)
    .filter(x=>x>window.width/2)
    .subscribe(val=>console.log(val))

observable.take

This method should result in an observable that automatically ends itself after the specified number of items. This sounds kind of trivial, but in reality this is a bit more complicated than you might think at first.

Without thinking to much on it you might end up with something like the following

Observable.prototype.take = function (numToTake) {
    var that = this;
    return new Observable(function (onNext) {
        var taken = 0;
        var d = that.subscribe(function (next) {
            if (taken++ < numToTake) {
                onNext(next);
                if (taken === numToTake) d();
            }
        });
        return d;
    });
};

In this code we have a critical flaw. What if the data comes synchronously? Then we would not have the disposable handle when we try to use it. This will simply not do and I have no intention of using setTimeout to deal with this. What about RxJs, how did they handle this?

Digging through the code we find that they use a couple of intermediate objects to deal with this. That is a principle that we can shamelessly steal. So we create a new observable

function AutoDisposingObservable(sub) {
    Observable.call(this, function (onNext) {
        var isDisposed = false;
        var dispose = function () {
            isDisposed = true;
        };

        function setDispose(d) {
            if (isDisposed)d();
            dispose = d;
            return d;
        }

        return setDispose(sub(function (value) {
            if (!isDisposed) {
                onNext(value);
            }
        }, ()=> dispose()));
    });
}
AutoDisposingObservable.prototype = Object.create(Observable.prototype);

Perhaps not my finest piece of code but it will do the trick for now. The AutoDisposer works in such a way that it will send a disposable handle to the creator that the creator can use to scrap the subscription no matter if the subscribe call has returned or not. When the subscribe method returns we check if the dispose method has been called, if it has we simply stop the underlying source. In other case we change so the caller will stop the underlying source directly when wanting to scrap the subscription.

The new take method now looks like this

Observable.prototype.take = function (numToTake) {
    var that = this;
    return new AutoDisposingObservable(function (onNext, onComplete) {
        var taken = 0;
        return that.subscribe(function (next) {
            if (taken++ < numToTake) {
                onNext(next);
                if (taken === numToTake) onComplete();
            }
        });
    });
};

And with that in place we can now write this

var oneSecIntervalObs=Observable.interval(1000).map((_,i)=>i);
var d=Observable.listenFor('click',document.body)
    .map(ev=>ev.x)
    .zip(oneSecIntervalObs,(x,i)=>x*i)
    .filter(x=>x>window.width/2)
    .take(2)
    .subscribe(val=>console.log(val))

And there you go! we have a fully functional example of what we set out to build.

End words

I hope that you have found this post informative and that you go away with some new found insights. I know I had quite a bit of fun spiking this code at least.

If you are intrigued and want to see more “under the hood” code, I would recommend that you take a peek into the RxJS code. It might look a bit daunting at first glance, but most of the code is pretty easy to read.

Code

github

Leave a Reply