Sunday, March 21, 2010

Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1

Followers of this blog must have known by now that I am a big fan of a clean domain model. And domain driven design, espoused by Eric Evans is the way to go when you are modeling a complex domain and would like to have your model survive for quite some time in the future. Recently I have been experimenting a bit with domain driven design using some amount of asynchronous message passing techniques particularly in the services and the storage layer.

The Repository, as Eric says, is the domain centric abstraction on top of your data storage layer. It gives your model back the feeling that you are dealing with domain concepts instead of marshalling data across your storage layers. Typically you have contracts for repositories at the aggregate root level. The underlying implementation commits to a platform (like JPA) and ensures that your object graph of the aggregate root rests in peace within the relational database. It need not be a relational database - it can be file system, it can be a NoSQL database. That's the power of abstraction that Repositories add to your model.

Ever since I started playing around with Erlang, I have been toying with thoughts of making repositories asynchronous. I blogged some of my thoughts in this post and even implemented a prototype using Scala actors.

Enter Akka and its lightweight actor model that offers transaction support over an STM. Akka offers seamless integration with a variety of persistence engines like Cassandra, MongoDB and Redis. It has plans of adding to this list many of the relational stores as well. The richness of the Akka stack makes for a strong case in designing a beautiful asynchronous repository abstraction.

Consider a very simple domain model for a Bank Account ..

case class Account(no: String, 
  name: String, 
  dateOfOpening: Date, 
  dateOfClose: Option[Date],
  balance: Float)

We can model typical operations on a bank account like Opening a New Account, Querying for the Balance of an Account, Posting an amount in an Account through message dispatch. Typical messages will look like the following in Scala ..

sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent


Note all messages are immutable Scala objects, which will be dispatched by the client, intercepted by a domain service, which can optionally do some processing and validation, and then finally forwarded to the Repository.

In this post we will look at the final stage in the lifecycle of a message, which is how it gets processed by the Repository. In the next post we will integrate the whole along with an abstraction for a domain service. Along the way we will see many of the goodness that Akka transactors offer including support for fault tolerant processing in the event of system crashes.

trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
  lifeCycle = Some(LifeCycle(Permanent))    
  val STORAGE_ID = "account.storage"

  // actual persistent store
  private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

  def receive = {
    case New(a) => 
      atomic {
        accounts.+=(a.no.getBytes, toByteArray[Account](a))
      }

    case Balance(from, no) =>
      val b = atomic { accounts.get(no.getBytes) }
      b match {
        case None => reply(None)
        case Some(a) => 
          val acc = fromByteArray[Account](a).asInstanceOf[Account]
          reply(Some(acc.balance))
      }

      //.. other message handlers
  }

  override def postRestart(reason: Throwable) = {
    accounts = RedisStorage.getMap(STORAGE_ID)  
  }
}


The above snippet implements a message based Repository abstraction with an underlying implementation in Redis. Redis is an advanced key/value store that offers persistence for a suite of data structures like Lists, Sets, Hashes and more. Akka offers transparent persistence to a Redis storage through a common set of abstractions. In the above code you can change RedisStorage.getMap(STORAGE_ID) to CassandraStorage.getMap(..) and switch your underlying storage to Cassandra.

The above Repository works through asynchronous message passing modeled with Akka actors. Here are some of the salient points in the implementation ..

  1. Akka is based on the let-it-crash philosophy. You can design supervisor hierarchies that will be responsible for controlling the lifecycles of your actors. In the Actor abstraction you can configure how you would like to handle a crash. LifeCycle(Permanent) means that the actor will always be restarted by the supervisor in the event of a crash. It can also be Lifecycle(Temporary), which means that it will not be restarted and will be shut down using the shutdown hook that you provide. In our case we make the Repository resilient to crashes.

  2. accounts is the handle to a Map that gets persisted in Redis. Here we store all accounts that the clients open hashed by the account number. Have a look at the New message handler in the implementation.

  3. With Akka you can also provide a restart hook when you repository crashes and gets restarted automatically by the supervisor. postRestart is the hook where we re-initialize the Map structure.

  4. Akka uses multiverse, a Java based STM implementation for transaction handling. In the code mark your transactions using atomic {} and the underlying STM will take care of the rest. Instead of atomic, you can also use monadic for-comprehensions for annotating your transaction blocks. Have a look at Akka documentation for details.


Asynchronous message based implementations decouple the end points, do not block and offer more manageability in distribution of your system. Typical implementations of actor based models are very lightweight. Akka actors take around 600 bytes which means that you can have millions of actors even in a commodity machine. Akka supports various types of message passing semantics which you can use to organize interactions between your collaborating objects.

In the next post we will see how the Repository interacts with Domain Services to implement client request handling. And yeah, you got it right - we will use more of Akka actors.

11 comments:

Henrik Engström said...

Great blog post!

I will use these thoughts when I implement my next project.
//Henrik

Rich said...

I've been playing with some of the same ideas. Although my experimentation has been along the lines of trying to implement DDD with CQRS.

One of the problems I'm wrestling with is that I don't want to start off with the complexity of making everything eventually consistent. So my repository should store the event and also forward it along an event bus - which ideally would be another actor - for other bounded contexts, like the reporting context, to handle.

But I'd like the transactions in the domain and reporting contexts to be the same - or, at least, if one fails then they should both fail. That way I don't have to mess with compensating actions or anything like that. I haven't quite figured out how I'm going to handle that.

Ideally, everything would also be asynchronous as well, but I haven't quite figured out how to achieve all those goals.

Debasish said...

Rich:

Distributed transactions along with support for distributed STM will be coming in Akka in the near future. Please have a look at this discussion http://groups.google.com/group/akka-user/browse_thread/thread/6e523c0aba7323c1. Akka uses Multiverse as the STM and Multiverse will be implementing distributed transactions pretty soon.

Thanks.

Santi said...

Nice post, once again :)

I'm not sure about the concurrency in this approach. The mailbox of the Repository actor is processed serially, so you can only do one of these (New, Balance, etc) queries at a time. Doesn't that become a major bottleneck in a busy system?

Santi

Debasish said...

Santi:

Concurrency in an actor based system is achieved through multiple instances of actors processing messages from their own mailbox. As I mentioned, since actors are lightweight entities, you can have millions of them even on a commodity hardware.

If you system is dependent on message ordering then actors may not be the ideal way to model. The most effective way of scaling with actors is to use asynchronous messaging, modeled using ! in Akka. Besides !, Akka also offers other semantics of message passing, which you can get in Akka documentation.

Santi said...

Ah I see, so you would just run multiple instances of the Repository actor. I suppose that makes sense, with the atomic blocks.

Adjusting to the actor mindset is a bit hard, coming from the traditional shared state concurrency model. Your blog helps a lot. :)

Thanks!

Ross McDonald said...

This is great stuff Debashish.

I am all for keeping a clean domain model, nothing irritates me more than JAXB or JPA annotations polluting my domain classes.

Your solution reveals to me how nice it is to create a repository abstraction, enabling me just to post a simple key/value pair into something like Redis.

How does this scale when we swap a JPA store in, instead of the simpler store?

Do you have any ideas how we could layer the JPA annotations over our entities attributes in some kind of automated reproducible way?

-- Ross

Erick Fleming said...

Thanks for the post. The title is a bit confusing, since your not actually using Akka "tansactors" in your sample code.

Or is that a generic term applied to Actors with STM?

Debasish said...

Erick -

actually I was not using transactors in the sample code. Instead I was using the atomically {} construct for transactions. But I used it in the title more as a generic concept of unifying actors and STM in Akka.

Anonymous said...

I guess this is a Scala question.
Can you explain the how the toByteArray[Account](a) and the fromByteArray[Account](a) works?

Debasish said...

Anonymous -

I was using sbinary for serialization. Have a look at http://github.com/DRMacIver/sbinary/blob/master/src/operations.scala for fromByteArray and toByteArray ..