Monday, January 31, 2011

CQRS with Akka actors and functional domain models

Fighting with impedance mismatch has been quite a losing battle so far in the development of software systems. We fight mismatch to handle stateful interactions with a stateless protocol. We fight mismatch of paradigms between the user interface layers, domain layers and data layers. Nothing concrete has emerged till date, though there has been quite a number of efforts to keep the organization of persistent data as close as possible to the way the domain layer uses them.

Command Query separation is nothing new. It's yet another attempt to manage impedance mismatch between how your application uses data and how the underlying store manages data so that transactional updates can be served with equal agility as read-only queries. Bertrand Meyer made this distinction long back when he mentioned that ..

The features that characterize a class are divided into commands and queries. A command serves to modify objects, a query to return information about objects.
Processing a command involves manipulation of state - hence the underlying data model needs to be organized in a way that makes updation easier. A query needs to return data in the format the user wants to view them. Hence it makes sense to organize your storage likewise so that we don't need to process expensive joins in order to process queries. This leads to a dichotomy in the way the application, as a whole, requires processing of data. Command Query Separation (CQRS) endorses this separation. Commands update state - hence produce side-effects. Queries are like pure functions and should be designed using applicative, completely side-effect free approaches. So, the CQRS principle, as Bertrand Meyer said is ..

Functions should not produce abstract side-effects.
Greg Young has delivered some great sessions on DDD and CQRS. In 2008 he said "A single model cannot be appropriate for reporting, searching and transactional behavior". We have at least two models - one that processes commands and feeds changes to another model which serves user queries and reports. The transactional behavior of the application gets executed through the rich domain model of aggregates and repositories, while the queries are directly served from a de-normalized data model.

CQRS and Event Sourcing

One other concept that goes alongside CQRS is that of Event Sourcing. I blogged about some of the benefits that it has quite some time back and implemented event sourcing using Scala actors. The point where event sourcing meets CQRS is how we model the transactions of the domain (resulting from commands) as a sequence of events in the underlying persistent store. Modeling persistence of transactions as an event stream helps record updates as append only event snapshots that can be replayed as and when required. All updates in the domain model are now being translated into inserts in the persistence model. And this gives us an explicit view of all state changes in the domain model.

Over the last few days I have been playing around implementing CQRS and Event Sourcing within a domain model using the principles of functional programming and actor based asynchronous messaging. One of the big challenges is to model updates in a functional way and store them as sequences of event streams. In this post, I will share some of the experiences and implementation snippets that I came up with over the last few days. The complete implementation, so far, can be found in my github repository. It's very much a work in progress, which I hope to enrich more and more as I get some time.

A simple domain model

First the domain model and the aggregate root that will be used to publish events .. It's a ridiculously simple model for a security trade, with lots and lots of stuff elided for simplicity ..

// the main domain class
case class Trade(account: Account, instrument: Instrument, refNo: String, 
  market: Market, unitPrice: BigDecimal, quantity: BigDecimal, 
  tradeDate: Date = Calendar.getInstance.getTime, valueDate: Option[Date] = None, 
  taxFees: Option[List[(TaxFeeId, BigDecimal)]] = None, 
  netAmount: Option[BigDecimal] = None) {

  override def equals(that: Any) = refNo == that.asInstanceOf[Trade].refNo
  override def hashCode = refNo.hashCode
}


For simplicity, we make reference number a unique identifier for a trade. So all comparisons and equalities will be based on reference numbers only.

In a typical application, the entry point for users is the service layer that exposes facade methods that render use cases for the business. In a trading application, two of the most common services that need to be done on a security trade are it's value date computation and it's enrichment. So when the trade passes through its processing pipeline it gets its value date updated and then gets enriched with the applicable taxes and fees and finally its worth net cash value.

If you are a client using these services (again, overly elided for simplicity) you may have the following service methods ..

class TradingClient {
  // create a trade : wraps the model method
  def newTrade(account: Account, instrument: Instrument, refNo: String, market: Market,
    unitPrice: BigDecimal, quantity: BigDecimal, tradeDate: Date = Calendar.getInstance.getTime) =
      //..

  // enrich trade
  def doEnrichTrade(trade: Trade) = //..

  // add value date
  def doAddValueDate(trade: Trade) = //..

  // a sample query
  def getAllTrades = //..
}


In a typical implementation these methods will invoke the domain artifacts of repositories that will either query aggregate roots or do updates on them before being persisted in the underlying store. In a CQRS implementation, the domain model will be updated but the persistent store will record these updates as event streams.

So now we have the first problem - how do we represent updates in the functional world so that we can compose them later when we need to snapshot the persistent aggregate root?

Lenses FTW

I used type lenses for representing updates functionally. Lenses solve the problem of representing updates so that they can be composed. A lens between a set of source structures S and a set of target structures T is a pair of functions:
- get from S to T
- putback from T x S to S

For more on lenses, have a look at this presentation by Benjamin Pierce. scalaz contains lenses as part of its distribution and models a lens as a case class containing a pair of get and set functions ..

case class Lens[A,B](get: A => B, set: (A,B) => A) extends Immutable { //..


Here are some examples from my domain model for updating a trade with its value date or enriching it with tax/fee values and net cash value ..

// add tax/fees
val taxFeeLens: Lens[Trade, Option[List[(TaxFeeId, BigDecimal)]]] = 
  Lens((t: Trade) => t.taxFees, 
       (t: Trade, tfs: Option[List[(TaxFeeId, BigDecimal)]]) => t.copy(taxFees = tfs))

// add net amount
val netAmountLens: Lens[Trade, Option[BigDecimal]] = 
  Lens((t: Trade) => t.netAmount, 
       (t: Trade, n: Option[BigDecimal]) => t.copy(netAmount = n))

// add value date
val valueDateLens: Lens[Trade, Option[Date]] = 
  Lens((t: Trade) => t.valueDate, 
       (t: Trade, d: Option[Date]) => t.copy(valueDate = d))


We will use the above lenses for updation of our aggregate root and also wrap them into closures for subsequent feed into the event stream for persistent storage. In this example I have implemented in-memory persistence for both the command and the query store. Persistence into an on disk database will be available very soon at a github repository near you :)

Combinators that abstract state processing

Let's now define a couple of combinators that encapsulate our transactional service method implementations within the domain model. Note how the lenses have also been abstracted away from the client API as implementation artifacts. For details of these implementations please visit the github repo that contains a working model along with test cases.

// closure that enriches a trade with tax/fee information and net cash value
val enrichTrade: Trade => Trade = {trade =>
  val taxes = for {
    taxFeeIds      <- forTrade // get the tax/fee ids for a trade
    taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  }
  yield(taxFeeIds map taxFeeValues)
  val t = taxFeeLens.set(trade, taxes(trade))
  netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2)))
}

// closure for adding a value date
val addValueDate: Trade => Trade = {trade =>
  val c = Calendar.getInstance
  c.setTime(trade.tradeDate)
  c.add(Calendar.DAY_OF_MONTH, 3)
  valueDateLens.set(trade, Some(c.getTime))
}


We will now use these combinators to implement our transactional services which the TradingClient will invoke. Each of these service methods will do 2 things :-

1. effect the closure on the domain model and
2. as a side-effect stream the event into the command store

Sounds like a kestrel .. doesn't it ? Well here's a kestrel combinator and the above service methods realized in my CQRS implementation ..

// refer To Mock a Mockingbird
private[service] def kestrel[T](trade: T, proc: T => T)(effect: => Unit) = {
  val t = proc(trade)
  effect
  t
}

// enrich trade
def doEnrichTrade(trade: Trade) = 
  kestrel(trade, enrichTrade) { 
    ts ! TradeEnriched(trade, enrichTrade)
  }

// add value date
def doAddValueDate(trade: Trade) = 
  kestrel(trade, addValueDate) { 
    ts ! ValueDateAdded(trade, addValueDate)


Back to Akka!

It was only expected that I will be using Akka for transporting the event down to the command store. And this transport is implemented as a asynchronous side-effect of the service methods - just what the doctor ordered for an actor use case :)

With Event Sourcing and CQRS, one of the things that you would require is the ability to snapshot your persistent versions of the aggregate root. The current implementation is simple and does a zero based snapshotting i.e every time you ask for a snapshot, it replays the whole stream for that trade and gives you the current state. In typical real world systems, you do interval snapshotting and start replaying from the latest available snapshot in case you want to get the current state.

Here's our command store modeled as an Akka actor that processes the various events that it receives from an upstream server ..

// CommandStore modeled as an actor
class CommandStore(qryStore: ActorRef) extends Actor {
  private var events = Map.empty[Trade, List[TradeEvent]]

  def receive = {
    case m@TradeEnriched(trade, closure) => 
      events += ((trade, events.getOrElse(trade, List.empty[TradeEvent]) :+ closure))
      qryStore forward m
    case m@ValueDateAdded(trade, closure) => 
      events += ((trade, events.getOrElse(trade, List.empty[TradeEvent]) :+ closure))
      qryStore forward m
    case Snapshot => 
      self.reply(events.keys.map {trade =>
        events(trade).foldLeft(trade)((t, e) => e(t))
      })
  }
}


Note how the Snapshot message is processed as a fold over all the accumulated closures starting with the base trade. Also the command store adds the event to its repository (which is currently an in memory collection) and forwards the event to the query store. There we can have the trade modeled as per the requirements of the query / reporting client. For simplicity the current example assumes the model as the same as our domain model presented above.

Here's the query store, also an actor, that persists the trades on receiving relevant events from the command store. In effect the command store responds to messages that it receives from an upstream TradingServer and asynchronously updates the query store with the latest state of the trade.

// QueryStore modeled as an actor
class QueryStore extends Actor {
  private var trades = new collection.immutable.TreeSet[Trade]()(Ordering.by(_.refNo))

  def receive = {
    case TradeEnriched(trade, closure) => 
      trades += trades.find(== trade).map(closure(_)).getOrElse(closure(trade))
    case ValueDateAdded(trade, closure) => 
      trades += trades.find(== trade).map(closure(_)).getOrElse(closure(trade))
    case QueryAllTrades =>
      self.reply(trades.toList)
  }
}


And here's a sample sequence diagram that illustrates the interactions that take place for a sample service call by the client in the CQRS implementation ..


The full implementation also contains the complete wiring of the above abstractions along with Akka's fault tolerant supervision capabilities. A complete test case is also included along with the distribution.

Have fun!

19 comments:

hedefalk said...

Great post! Inspirational!

jfr said...

Could You explain how the closure "enrichTrade" works. It uses the function forTrade (in the for expression) but I just don't know how the trade is passed to "forTrade"

Regards, Jan

Debasish said...

The for comprehension yields a composition of taxFeeIds and taxFeeValues, which is a function that takes a trade as input. Have a look at the very next line where I invoke taxes(trade) which then supplies the necessary trade argument to forTrade. This is a technique of implementing the Reader monad in scala. You can get a better understanding of the concept by looking at an earlier blog that I wrote on this subject (http://debasishg.blogspot.com/2010/12/case-study-of-cleaner-composition-of.html).

jfr said...

Thanks for the answer. After reading your comment and playing with the scala repl it looks rather obvious. Must be the years of imperative programming. ;-)

Regards, Jan

Erik van Oosten said...

Axon, a CQRS framework, makes a distinction between commands and events. The first 'notify' the system that the external world has changed 'MovedAddressCommand'. This is then translated to one or more events 'ChangedAddressEvent' that are stored in the event strore for later replay. Only events are received by the query components. In addition, events can be captured by other aggregates and can in turn lead to more events.
Commands are not stored unless for additional auditing.

You do not seem to make this distinction. Is this on purpose?

Could you explain why lenses are important? As I understand lenses, they are about having an external view to the actual aggregate. Isn't this already filled in by the query components.

Debasish said...

I think the reason you feel that I don't make this difference is the naming of CommandStore. It should be renamed to EventStore, because that's what it stores. In the implementation doEnrichTrade, doAddValueDate are the commands which result in events like TradeEnriched and ValueDateAdded, which are then stored.

Lenses offer a way to update aggregates "functionally*. You can find more information on lenses in the presentation by Benjamin Pierce, which I linked in the main article.

Anonymous said...

I saw the comment about lens and your response. While I understand in general that lens are useful and not sure what value they provide in this particular example.
For example, as far as I can tell the only usage are the calls to set() in the closures enrichTrade and addValueDate and seem to obscure rather than clarify the intent of the code. Am I missing something?

Debasish said...

Lenses are usually used for functionally updating nested structures. They are somewhat like zippers. The main advantage of using lenses is that they are composable, though we don;t use any composability in this example. Not sure why you feel using lenses make the code look obscure.

oxbow said...

Thanks - I understand lenses a little better now. I'm interested in what looks like the apparent non-scalability of using this in a real system, however. Could you perhaps elucidate what a "collapseHistory" method might look like in your CommandQueryStore? That is, how can I collapse a sequence of (Trade => Trade) updates into a single (Trade => Trade) update? Otherwise the CommandQueryStore is one big memory leak.

Debasish said...

Hi oxbow -

The collapsing takes place when the CommandStore receives a Snapshot message. All accumulated closures on a specific trade are executed in sequence to result in the latest updated snapshot of the trade. Have a look at the sample code in the post how I do that using a fold.

Thanks.

Buff said...

After reading this post I keep thinking of more places it can be applied, and the advantages of doing so (cheap undo, free audit trail etc)

Do you have any thoughts on the best way to persist the commands? It would be a shame to only be able to use the technique for short-lived purposes.

Banq said...

here is a java CQRS sample with jdon framework like AKKa actors:http://jivejdon.blogspot.com/2011/09/ddd-dci-and-domain-events-example.html

can you tell me what difference with them? or why use FP? FP is not fit for DDD that is a static entity model class .

Debasish said...

@Banq

Not sure why you feel FP is not suitable for DDD. The advantage that a functional domain model gives you is immutability. We don't mutate shared state. Hence you can safely share your domain model components with other application components. In the example, note how all updates are done functionally using lenses.When you are doing DDD you can do similar stuff with aggregates.

Anonymous said...

I have an Event Sourcing app in production for 9 years. (Full audit trail is a strong requirement in our domain.)
It works very well (impossible to override data by error), it is fast (because current state is in-memory) and very flexible (no schema update needed, POJO-based queries).

But my boss still doesn't understand why I don't use a plain SQL database. :((

Debasish Ghosh said...

I am also seriously exploring Event Sourcing as an option in practical real world applications. Event Sourcing, asynchronous messaging and functional programming goes very well together. I will be speaking on this same topic in PhillyETE next week. Please do drop by if you are around :-)

Anonymous said...

Well, I'm on the wrong side of the altlantic ;)

Yes, when I wrote my app (classic OO oriented), I was not aware of the advantages of the functionnal approach.
Event Sourcing + immutable structures seems to match very well.
But maybe it would require a bit more memory for the "current" data model in the heap than the mutable OO approach (?). Maybe that's the price to pay for more simplicity and robustness.

Tobi said...

This is still a great post. Getting some inspiration from it for my current project. Regarding the FP/DDD discussion ... this presentation gives a nice intro why those concepts fit together (http://skillsmatter.com/podcast/design-architecture/ddd-functional-programming).

Otavio Macedo said...

What about consistency? If I understand it correctly, the query store is fed with updates asynchronously. So,the query store may not always have the latest state.

Debasish Ghosh said...

That's true .. Not all use cases need to have the latest update fed into the query subsystem instantaneously. e.g. My bank account does not always have the latest debit or credit reflected in the query screen the moment they happen. And you can always control the frequency at which the query subsystem gets refreshed from the updates.