Event Sourcing in Clojure

In this blog post I implement the same rock-paper-scissors domain using what I hope is idiomatic Clojure. By using immutable datastructures I show that snapshots of aggregate state is really nothing special and snapshotting should be implemented as a cache. All code is available at GitHub.

Domain code

Let’s start with the commands and events:

I hope you already now see why I like Clojure better than Java!

To implement the actual game logic I decided to use a multimethod. There are of course other and more concise ways to do this, but I find this to be self explanatory:

In the Java code I used an Aggregate object that contained all command handles and encapsulated the state. In an object oriented language that is probably an idiomatic way of solving the problem. However, in Clojure we don’t mutate state. Therefore, the idiomatic approach in Clojure is to separate the behavior and the state. I decided to use a protocol for handling commands and storing the state in a simple map.

I hope that you find this this code quite easy to read, even though we have separated the state and behavior!

To show you some more cool Clojure tricks (and to keep the code simpler!) I decided to use a multimethod for event handling. Notice that I dispatch depending on the type of the second argument, where as in OOP you dispatch on the type of the first argument. (I don’t actually recommand doing this, but it keeps the apply-events function belong slightly more concise and is also showing a usecase for multimethods). Notice that the apply-event method returns the new state:

Ok, that was all the code needed to implement the domain. I think this is very elegant!

Infrastructure code

I need some infrastructure code to make this work. First a small helper function to apply several events. Since I defined the apply-event function like I did above, applying all events is a simple left fold:

The EventStore almost has the same signature as in the Java example. In Clojure I think it makes even more sense to group the retrieve-event-stream and append-events together in same protocol as the EventStream is just an immutable datastructure containing the version and events. While writing this code I also realized that the version information is strictly not necessary, but I still believe it is useful and left it in there for now.

Clojure really focus you on creating immutable code, so whenever you implement something that mutates you automatically think a lot more than you usually do in Java. So when implementing the in-memory version of the EventStore I realized that the Java version was seriously flawed and was not thread safe. While figuring out how to solve this I still decided to use Javas ConcurrentHashMap instead of Clojures datastructures (such as refs and atoms) because I could not figure out how to use them without creating a bottleneck and force execution to be serial. Not that it really matters since you probably only use the in-memory version for testing. Let me know if you find a more Clojure idiomatic solution!

The application service ties everything together by receiving the command, loading all events, replaying to current state, executing the command and storing the generated events. In Clojure this is just a small function:

Test

Ok, lets try to perform some commands:

Great! It seems to work!

Conclusion

So what can we learn from all this? First of all, by using immutable datastructures for commands and events it is trivial to send these as messages. More importantly, by representing the aggregate state using an immutable datastructure we could easily cache this. Aggregate state caching is sometimes called snapshotting. I think this code also shows that this cache (snapshot) should not be part of the EventStore as it is something the application service is responsible for. For example the application service could cache a snapshot together with a version. When applying events it should only apply events belonging to newer versions.

Another way of looking at this is that the aggregate state is just a projection of the events generated by that aggregate. Greg Young talks about projections of event streams in this talk Event Store As a Read Model. There is really nothing special about the aggregate state! The aggregate state is just something the Application Service use to perform commands that generate new events.

Another important point is that since all domain code is pure (has no side-effects) it is really simple to test!

Hope you enjoyed this exploration in Clojure. A big thanks to Ulrik Sandberg for reviewing and rewriting my messy Clojure code in a more idiomatic style. As always, let me know what you think!

30 Comments

  1. Hi,

    It has a problem if you store the events likes (ar-id events), it is not possible to guarantee the order of events when replaying

    • Not really sure I understand what you mean. The implementation of the eventstore is responsible for maintaining the order of the events. If necessary it can enhance the event representation with extra information that might be necessary to maintain order, for example when storing in a SQL database you typically add an additional column for event order. This extra information doesn’t have to be exposed to the application developer.

  2. Florian

    Thanks for this interesting article.
    I not only learned about event stores but also a lot about clojure. I’m a clojure beginner. What do your think about my implementation of the compare-moves function? A little less code, but I am not sure what is more idiomatic.

    (defn compare-moves [a b]
    ({[:rock :rock] :tie
    [:rock :paper] :loss
    [:rock :scissor] :victory
    [:paper :rock] :victory
    [:paper :paper] :tie
    [:paper :scissor] :loss
    [:scissor :rock] :loss
    [:scissor :paper] :victory
    [:scissor :scissor] :tie} [a b]))

    • I like it! It has the same clarity, but is probably more performant as it does not involve a multimethod.

    • Ulrik Sandberg

      Here’s another go. First a helper function in the form of a map:

      (def beats {:rock :paper :paper :scissors :scissors :rock})

      Testing:

      user> (beats :rock)
      :paper
      user> (beats :scissors)
      :rock

      Then the actual comparison, in the form of a function with a conditional:

      (defn compare-moves [a b]
      (cond
      (= a b) :tie
      (= a (beats b)) :victory
      :else :loss))

      Testing:

      user> (compare-moves :rock :paper)
      :loss

      But in that code, we see that the predicate is ‘=’ in all clauses, and the first argument is ‘a’ in all clauses. We can use ‘condp’, which takes a predicate, a first argument, and then pairs of second arguments and their corresponding matches. The last one is the else value.

      (defn compare-moves [a b]
      (condp = a
      b :tie
      (beats b) :victory
      :loss))

      Testing:

      user> (compare-moves :rock :paper)
      :loss
      user> (compare-moves :rock :scissors)
      :victory
      user> (compare-moves :scissors :paper)
      :victory

    • And here’s core.logic version ;)

      (use ‘clojure.core.logic)

      (facts beats [[:rock :scissor] [:paper :rock] [:scissor :paper]])

      (defn play [hand1 hand2]
      (run* [result]
      (conde
      ((beats hand1 hand2) (== result :win))
      ((beats hand2 hand1) (== result :loose))
      ((== hand1 hand2) (== result :tie)))))

      ; user=> (play :rock :rock)
      ; (:tie)
      ; user=> (play :rock :paper)
      ; (:loose)
      ; user=> (play :scissor :paper)
      ; (:win)
      ; user=>

  3. Martin

    Shouldn’t this sentence:

    “The EventSource almost has the same signature as in the Java example.”

    read:

    “The *EventStore* almost has the same signature as in the Java example.”

    ?

  4. Martin

    I am interested what specific problems you found in using an atom for the EventStore? Rich did a a comparison here : https://groups.google.com/d/msg/clojure/dK6x_QpCpvo/OitIryoFSAgJ Please note that Clojure performance has improved quite a lot since.

    • I tried using a ref. The problem is that it creates a global lock which means that all modifications happen serially. Here are the results of my non-scientific microbenchmark:

      Single threaded 10000 aggregates
      ConcurrentHashMap 8 ms
      ref with map 35 ms

      Multithreaded with 10000 aggregates 10 pmap partions (no contention / aggregate)
      ConcurrentHashMap 6 ms
      ref med map 150 ms

      Multithreaded with 10000 events with 10 partions using pmap (only 10 aggregates, that is lots of contention / aggregate)
      ConcurrentHashMap 100 ms
      ref with map 150 ms

      Multithreaded with 10000 events with 10 pmap partions (100 aggregates, that is some contention / aggregate)
      ConcurrentHashMap 12 ms
      ref with map 150 ms

      Of course, for this particular problem it does not matter as the in-memory-event-store is only used for testing….

      • Martin

        I think you want to use an atom for this scenario. Refs are only necessary if you need transactions.

    • Feel free to implement the EventStore protocol using an atom and I’ll have a look at it!

    • Here is the code for the simple parallell test. It is not nice nor scientific… :-)

      (defn measure-fn [fun args]
      (def before (System/currentTimeMillis))
      (apply fun args)
      (def after (System/currentTimeMillis))
      (print “time ” (- after before)))

      (defn append-event [eventStore aggregateId]
      (let [es (retrieve-event-stream eventStore aggregateId)]
      (append-events eventStore aggregateId es [“event”])))

      (defn pappend-test [totalCount partionSize eventStore]
      (count (pmap
      (fn [ids]
      (count (map (fn [aggregateId] (append-event eventStore aggregateId)) ids)))
      (partition partionSize (range 0 totalCount)))))

      (measure-fn pappend-test [10000 10 inMemoryEventStore])

      • Martin

        I decided to use Hugo Duncan’s excellent Criterium library to do the benchmark.

        Please see this gist for the code:

        https://gist.github.com/maacl/5340142

        I have intentionally kept the Clojure solution as close to the Java one as possible (and yes I know that the exception will never be thrown :-)). I could be much more ideomatic.

        The in-memory-event-store should be renamed to j-in-memory-event-store and the benchmarking code should be run from main.clj. Criterium [criterium “0.3.1”] should be added to the project.clj as a dependency and to main.clj as a require.

        The benchmark results are as follows on my laptop:

        Java ConcurrentHashMap:
        (crit/quick-bench (apply pappend-test [10000 10 f/j-in-memory-event-store]))

        WARNING: Final GC required 137.6310174495075 % of runtime
        Evaluation count : 72 in 6 samples of 12 calls.
        Execution time mean : 9.508264 ms
        Execution time std-deviation : 1.229543 ms
        Execution time lower quantile : 8.931417 ms ( 2.5%)
        Execution time upper quantile : 11.635167 ms (97.5%)

        Found 1 outliers in 6 samples (16.6667 %)
        low-severe 1 (16.6667 %)
        Variance from outliers : 31.6030 % Variance is moderately inflated by outliers

        Clojure atome hashmap:
        (crit/quick-bench (apply pappend-test [10000 10 f/c-in-memory-event-store]))

        WARNING: Final GC required 194.0910584837865 % of runtime
        Evaluation count : 48 in 6 samples of 8 calls.
        Execution time mean : 11.328042 ms
        Execution time std-deviation : 1.287739 ms
        Execution time lower quantile : 10.272500 ms ( 2.5%)
        Execution time upper quantile : 13.421969 ms (97.5%)

        Found 1 outliers in 6 samples (16.6667 %)
        low-severe 1 (16.6667 %)
        Variance from outliers : 31.0942 % Variance is moderately inflated by outliers

    • The reason I used a ref is because both replace and putIfAbsent must first check then perform the modification atomically, ie both the if statement and the modification must be done within a transaction. Not sure if this is possible with an atom?

      Your implementation is not thread safe and it will loose some modifications.

      • Martin

        Duh on my part.

        Just for my understanding (and I might be missing something fundamental about how ConcurrentHashMap works) but are the retrieve-event-stream and append-events threadsafe in your implementation and if so why?

    • ConcurrentHashMap replace and putIfAbsent perform atomic check+modification and therefore my implementation is threadsafe, see the Javadocs for details:

      http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html#replace(K, V, V)

      The reason I throw a ConcurrentModificationException is if another thread has modified a particular event stream since my current thread retrieved it. This has nothing to do with ConcurrentHashMap, instead this is part of the event store behavior. If this happens I simply retry, that is retrieve the event stream again and perform the same modification. I did not include this in the handle-command function because I wanted to simplify the example.

      Notice that the command may have a different result (ie different events) when it is retried since the aggregate state is different!

      • Martin

        Got you. Was confused by the fact that HandleCommand in itself did not appear safe, since it might be appending to a changed event stream. This is the part that you would normally use a ref for, which will handle the retry for you.

    • Yes, however the handle-command method is design primarily with real persistence in mind and not a in-memory version. :-)

  5. Geraint Williams

    Hi Jan,

    I really enjoyed the EventStore / Datomic talk you gave (I found on Vimeo) I have been looking for some perspective on how Datomic could fit into the Event Sourcing pattern.

    I am learning Clojure and trying to tackle an Event Sourcing problem using it, and I found this approach interesting.
    However, the commands vs events you mentioned in your talk, and that you cover here puzzles me a bit.

    Taking your terms, I always saw the commands as being the Events in terms of event sourcing, as they are the single point of truth that you need to recreate your domain state from nothing.

    Isn’t it the commands you would replay rather than the events, as many events are simply functions of the state plus the previous event (e.g. GameWonEvent) therefore while important for distributing information to interested other systems – contain no novel information?

    • Thanks!

      Confusing commands and events is quite common as their interpretation depends on your perspective. I use events and commands in the way Greg Young uses them, for example have a look at this discussion: https://groups.google.com/forum/#!topic/dddcqrs/bnGiUMQFi4g

      I quote:

      Command is a message.
      Event is a message.

      An event is something that has happened.
      A command is something you want me to do.

      I must accept an event.
      I can say no to a command.

      ———–

      The problem with replaying commands is that you business logic may change over time. For example, at one point a MakeMoveCommand would set you as a the winner of the game. When we change the business logic the same command given the same state might have a different result and you no longer win the game.

      Hope this helps!

  6. Daniel Marjenburgh

    Hi Jan,

    I know this post is a year old, but I recently got interested in event sourcing and came across this post.

    As the only piece of mutable state you have in your code is the event-store, you actually can use an atom. It has the benefit of making the state mutation a bit more isolated to one place and it also makes your data (the event-streams inside the event-store) visible. You get code that retries in case of concurrent commands for free.

    (defrecord ImmutableEventStore [state]
    EventStore
    (retrieve-event-stream [this aggregate-id]
    (get state aggregate-id {:version 0 :transactions []}))
    (append-events [this aggregate-id previous-es events]
    (assoc-in this [:state]
    {aggregate-id (-> previous-es
    (update-in [:version] inc)
    (update-in [:transactions] conj events))})))

    (def immutable-store (atom (->ImmutableEventStore {})))

    The handle-command fn can be reused:
    (defn handle-command2 [command]
    (swap! immutable-store #(handle-command command %)))

    Benchmark of the original code:
    (quick-bench (let [move2 (rand-nth [:rock :paper :scissors])
    id (java.util.UUID/randomUUID)]
    (handle-command (->CreateGameCommand id :p1 :rock) in-memory-event-store)
    (handle-command (->DecideMoveCommand id :p2 move2) in-memory-event-store)))
    Evaluation count : 67572 in 6 samples of 11262 calls.
    Execution time mean : 8.901787 µs
    Execution time std-deviation : 89.321163 ns
    Execution time lower quantile : 8.804187 µs ( 2.5%)
    Execution time upper quantile : 9.008592 µs (97.5%)
    Overhead used : 8.900853 ns

    Benchmark of the atom version:
    (quick-bench (let [move2 (rand-nth [:rock :paper :scissors])
    id (java.util.UUID/randomUUID)]
    (handle-command2 (->CreateGameCommand id :p1 :rock) immutable-store)
    (handle-command2 (->DecideMoveCommand id :p2 move2) immutable-store)))
    WARNING: Final GC required 277.8610282668765 % of runtime
    Evaluation count : 49362 in 6 samples of 8227 calls.
    Execution time mean : 12.614858 µs
    Execution time std-deviation : 586.619330 ns
    Execution time lower quantile : 12.169293 µs ( 2.5%)
    Execution time upper quantile : 13.293305 µs (97.5%)
    Overhead used : 8.900853 ns

    It went from ~8.9µs to ~12.6µs on my machine. It could probably be optimised more though.

    • Jan Kronquist

      I like your implementation of the EventStore protocol. It is clean and clear what is going on!

      However, as I previously mentioned, the reason for using ConcurrentHashMap was to allow parallell processing of commands. Not that this matters in a “development” implementation like the in-memory variant most likely is. The point I was getting at was more that for the real implementation you need to think about a solution that scales out to both multiple CPUs and multiple machines. By using ConcurrentHashMap I tried to illustrate this difficulty.

  7. Thanks for the article. I enjoyed it.

    I understood commands as being the implements of change and events as mere messages (DTOs) reporting changes. I would have thought the command itself would handle the actual state mutation and then, upon success, produce the events. In your example, your commands simply validate before producing events and those events via apply-event actually enact change.

    Why is mutation tied to the event and not to the command? You approach looks solid; I’m simply looking for a way to explain it to myself that is more obvious.

    • Jan Kronquist

      No, commands does not handle the state change. A command is a request that something happen and an event indicates that something did happen. The aggregate is responsible for handling the commands and publishing events. Once events have been published, they change the state of the aggregate. Please see more details in my other post about event sourcing. Hope this helps!

Trackbacks for this post

  1. Using EventStore Atom API from Clojure – Jayway

Leave a Reply