Using EventStore Atom API from Clojure

I have been experimenting with event sourcing in Clojure and in Java, but previously I simply used an in-memory store. In this post I’ll describe how to use Greg Young’s EventStore from Clojure.

The EventStore is written in .NET and runs on Mono. However, I didn’t bother to get it running on my Mac, so I simply used my Windows gaming machine. You could of course run it in a virtual machine or on Linux. There are a couple of things to note when starting the EventStore:

  • Run as Administrator
  • Use a special flag to enable projections
  • Specify IP address to access from another machine, ie don’t bind to

This is how I started it: (replace with the actual ip address!)

EventStore.SingleNode.exe -i --db ./db --log ./logs --run-projections=ALL

I will be using AtomPub API of the EventStore. At first I was confused by the order of the events in the feed, as I expected something similar to what is proposed in REST in Practice but I got a reasonable explanation from the EventStore team. The feed contains the most recent events, which means that the events that was generated first are actually last in the collection. That is, the feed is in reverse order to the time when the events were generated. To rebuild the current state of an aggregate I need to first load the last page in the feed, then keep loading the previous page until all events have been loaded.

I’m using clj-http for HTTP calls and cheshire for JSON parsing. The code for this blog post is available as a branch of the original rock-paper-scissors-in-clojure repository. I called the branch atom since most code actually deals with the atom parsing and nothing specific to the EventStore.

I have created an example event stream after creating a game and both players making their move. Here is the full stream and here is a single create game event. It might be useful to open in another window as you follow along.

Parsing Atom feeds

A Clojure program is typically built from small functions. So, lets start by looking at a small helper function to find the link with a certain relation attribute, for example previous or last. This will be of course be used to parse the links section in the full stream.

(defn uri-for-relation [relation links]
  (:uri (first (filter #(= relation (:relation %)) links))))

Then we actually need to do some work. When loading the first page of the feed for a event stream there are three different cases:

  1. 1. The stream does not exist
  2. 2. The stream exists, but has only a single page
  3. 3. The stream exists and has multiple pages
(def empty-stream {:version (fn [] -1) :events []})

(defn load-events-from-feed [uri]
	(let [response (client/get uri {:as :json :throw-exceptions false})]
	  (if-not (= 200 (:status response))
	    empty-stream ; case 1
	    (let [body (:body response)
	          links (:links body)
	          last-link (uri-for-relation "last" links)
	          events (if last-link
	                       (load-events last-link) ; case 3
	                       (load-events-from-list response))] ; case 2
	      {:version (fn [] (dec (count events)))
	       :events events}))))

Notice that EventStore uses the convention that a non-existing stream has version -1.

Then we need to traverse all the previous pages:

(declare load-events)

(defn load-events-from-list [response]
  (let [body (:body response)
        links (:links body)
        event-uris (reverse (map :id (:entries body)))
        previous-uri (uri-for-relation "previous" links)]
    (lazy-cat (map load-event event-uris)
              (if previous-uri (lazy-seq (load-events previous-uri))))))

(defn load-events [uri]
  (load-events-from-list (client/get uri {:as :json})))

Notice that a lazy sequence of events are returned (map is also lazy!). This means that only the events that are requested from the stream actually get loaded. In this case it is perhaps not so useful since the first thing we are going to do is to fold over all events to get the current state of the aggregate.

One of the coolest features of the EventStore is the support for projections (see below for an example!). However, this requires that our events are serialized as JSON. It also requires that we specify the eventType and a unique eventId. Unfortunately, EventStore does not allow . in type names which means we have to replace them in the Java package name.

(defn new-uuid [] (.toString (java.util.UUID/randomUUID)))

(defn to-eventstore-format [event]
  {:eventId (new-uuid)
   :eventType (.replace (.getName (class event)) . _)
   :data event})

Lets try this in the REPL:

com.jayway.rps.domain=> (defrecord MoveDecidedEvent [game-id player move])

com.jayway.rps.domain=> (def someMove (->MoveDecidedEvent 123 "ply1" :rock))

com.jayway.rps.domain=> (to-eventstore-format someMove)
{:eventId "5aba78e3-f384-4905-832a-3ebed5bf34c5",
 :eventType "com_jayway_rps_domain_MoveDecidedEvent",
 :data #com.jayway.rps.domain.MoveDecidedEvent{:game-id 123,
                                               :player "ply1",
                                               :move :rock}}

com.jayway.rps.domain=> (println (json/generate-string *1 {:pretty true}))
  "eventId" : "5aba78e3-f384-4905-832a-3ebed5bf34c5",
  "eventType" : "com_jayway_rps_domain_MoveDecidedEvent",
  "data" : {

Looks good! To parse this we need some magic:

(defn construct-record [type string]
  (when-let [f (resolve (symbol (clojure.string/replace type #".(w+)$" "/map->$1")))]
    (f string)))

(defn load-event [uri]
  (let [response (client/get uri {:as :json})
        event-data (get-in response [:body :content :data] {})
        event-type (.replace (get-in response [:body :content :eventType]) _ .)]
    (if event-type
      (construct-record event-type event-data)

All that remains is to implement my own EventStore protocol which now is trivial:

(defn atom-event-store [uri]
  (letfn [(stream-uri [aggregate-id] (str uri "/streams/" aggregate-id))]
    (reify EventStore
      (retrieve-event-stream [this aggregate-id]
        (load-events-from-feed (stream-uri aggregate-id)))

        [this aggregate-id previous-event-stream events]
        (client/post (stream-uri aggregate-id)
                     {:body (json/generate-string (map to-eventstore-format events))
                      :content-type :json
                      :headers {"ES-ExpectedVersion" (str ((:version previous-event-stream)))}})))))

As you can see from the append-events function it is really simple to call a REST api using Clojure!


Finally I wanted to try the projection feature. Here is a projection that calculates the number of games currently in progress:

    $init: function () {
        return { inProgress: 0 }; // initial state

    com_jayway_rps_domain_GameCreatedEvent: function(s, e) {
        return s;

    com_jayway_rps_domain_GameWonEvent: function(s, e) {
        return s;

    com_jayway_rps_domain_GameTiedEvent: function(s, e) {
        return s;

Lets test it:

$ curl | jq .
  "inProgress": 2

Some things to note as projections are not yet so well documented:

  • I use the fromCategory feature (another example) which goes through all streams with the prefix “game-“.
  • I use a map to store the state. You can of course use any data structure you want. In this case a simple number would also work fine.


Changing how the events are stored will not at all affect the domain code or the framework. Instead this is nicely abstracted by the EventStore protocol and it is simple to implement this protocol with persistent storage. Using Greg Young’s EventStore has been very easy and I really like the support for projections!

This Post Has One Comment

  1. Philipp Meier

    Thanks for the nice article. Extra kudos if you’d used clojurescript for the projections, though :)

Leave a Reply