Monday, March 28, 2011

Killing a var and Threading a State

In my earlier post on CQRS using functional domain models and Akka actors, I had implemented a data store that accumulates all events associated with individual trades. We call this EventSourcing that allows us to rollback our system in time and replay all events over the earlier snapshot to bring it up to date. This has many uses to detect any problems that might have occured in the past or to profile your system on a retroactive basis. This post is not about event sourcing or the virtues of CQRS.

In this post I start taking cue from the EventSourcing example and discuss some strategies of improving some aspects of the domain model. This is mostly related to raising the level of abstraction at which to program the solution domain. Consider this post to be a random rant to record some of my iterations in the evolution of the domain model.

The code snippets that I present below may sometimes look out of context since they're part of a bigger model. The entire prototype is there in my github repository ..

Story #1 : How to kill a var

Have a look at the event store that I implemented earlier ..

class EventStore extends Actor with Listeners {
  private var events = Map.empty[Trade, List[TradeEvent]]

  def receive = //..
  //..
}


With an actor based implementation the mutable var events is ok since the state is confined within the actor itself. In another implementation where I was using a different synchronous event store, I had to get around this mutable shared state. It was around this time Tony Morris published his Writer Monad in Scala. This looked like a perfect fit .. Here's the implementation of the abstraction that logs all events, shamelessly adopted from Tony's Logging Without Side-effects example ..

import TradeModel._
object EventLog {
  type LOG = List[(Trade, TradeEvent)]
}

import EventLog._

case class EventLogger[A](log: LOG, a: A) {
  def map[B](f: A => B): EventLogger[B] =
    EventLogger(log, f(a))

  def flatMap[B](f: A => EventLogger[B]): EventLogger[B] = {
    val EventLogger(log2, b) = f(a)
    EventLogger(log ::: log2 /* accumulate */, b)
  }
}

object EventLogger {
  implicit def LogUtilities[A](a: A) = new {
    def nolog =
      EventLogger(Nil /* empty */, a)

    def withlog(log: (Trade, TradeEvent)) =
      EventLogger(List(log), a)

    def withvaluelog(log: A => (Trade, TradeEvent)) =
      withlog(log(a))
  }
}


and here's a snippet that exercises the logging process ..

import EventLogger._

val trd = makeTrade("a-123", "google", "r-123", HongKong, 12.25, 200).toOption.get

val r = for {
  t1 <- enrichTrade(trd) withlog (trd, enrichTrade)
  t2 <- addValueDate(t1) withlog (trd, addValueDate)
} yield t2


Now I can check what events have been logged and do some processing on the event store ..

// get the log from the EventLogger grouped by trade
val m = r.log.groupBy(_._1)

// play the event on the trades to get the current snapshot
val x =
  m.keys.map {=>
    m(t).map(_._2).foldLeft(t)((a,e) => e(a))
  }

// check the results
x.size should equal(1)
x.head.taxFees.get.size should equal(2) 
x.head.netAmount.get should equal(3307.5000)


Whenever you're appending data to an abstraction consider using the Writer monad. With this I killed the var events from modeling my event store.

Story #2: Stateful a la carte

This is a story that gives a tip for handling changing state of a domain model in a functional way. When the state does not change and you're using an abstraction repeatedly for reading, you have the Reader monad.

// enrichment of trade
// Reader monad
val enrich = for {
  taxFeeIds      <- forTrade // get the tax/fee ids for a trade
  taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  netAmount      <- enrichTradeWith // enrich trade with net amount
}
yield((taxFeeIds map taxFeeValues) map netAmount)

val trd = makeTrade("a-123", "google", "r-123", HongKong, 12.25, 200)
(trd map enrich) should equal(Success(Some(3307.5000)))


Note how we derive the enrichment information for the trade keeping the original abstraction immutable - Reader monad FTW.

But what happens when you need to handle state that changes in the lifecycle of the abstraction ? You can use the State monad itself. It allows you to thread a changing state across a sequence transparently at the monad definition level. Here's how I would enrich a trade using the State monad as implemented in scalaz ..

val trd = makeTrade("a-123", "google", "r-123", HongKong, 12.25, 200).toOption.get

val x =
  for {
    _ <- init[Trade]
    _ <- modify((t: Trade) => refNoLens.set(t, "XXX-123"))
    u <- modify((t: Trade) => taxFeeLens.set(t, some(List((TradeTax, 102.25), (Commission, 25.65)))))
  } yield(u)

~> trd == trd.copy(refNo = "XXX-123")


In case you're jsut wondering what's going around above, here's a bit of an explanation. init initializes the state for Trade. init is defined as ..

def init[S]: State[S, S] = state[S, S](=> (s, s))


modify does the state change with the function passed to it as the argument ..

def modify[S](f: S => S) = init[S] flatMap (=> state(=> (f(s), ())))


We apply the series of modify to enrich the trade. The actual threading takes place transparently through the magic of comprehensions.

There is another way of securely encapsulating stateful computations that allow in-place updates - all in the context of functional programming. This is the ST monad which has very recently been inducted into scalaz. But that is the subject of another post .. sometime later ..

2 comments:

Opensource Solutions said...

It’s a great post, you really are a good writer! I’m so glad someone like you have the time, efforts and dedication writing, for this kind of article… Helpful, And Useful.. Very nice post!

Thesis Writing said...

Thank you for sharing such relevant topic with us. I really love all the great stuff you provide. Thanks again and keep it coming.