Sunday, March 28, 2010

Domain Services and Bounded Context using Akka - Part 2

In Part 1 of this series you saw how we can model a domain repository as an actor in Akka. It gives you declarative transaction semantics through Akka's STM and pluggable persistence engine support over a variety of data stores. As a result the domain model becomes cleaner. The repository that you design can take advantage of Akka's fault tolerance capabilities through supervisors that offer configurable lifecycle strategies.

One other important artifact of a domain model are the domain services. A domain service is not necessarily focused on any particular entity and is mostly centered around the verbs of the system. It models some actions or use cases involving multiple entities and is usually implemented as a stateless abstraction.

Using Akka you model a service as yet another actor. Domain services are coarse level abstractions and are the ones to receive requests from the clients. It can invoke other services or use any other entitites to do the job that it's supposed to do. No wonder a busy service gets requests from lots of consumers. Not only does it need to be stable, but it needs to ensure that all of it's other services with which it collaborates also stay alive while serving requests.

One of the services that it interacts with is the Domain Repository, which I discussed in the last post.

When you design a domain service using Akka actors, you can ensure that the service can make its collaborating services fault-tolerant through declarative or minimal programming effort. Akka runtime offers all the machinery to make implementation of fault tolerant services quite easy.

Consider the following domain service for management of Accounts, continuing our earlier example from the last post ..

trait AccountServer extends Actor {
  // handle crash
  faultHandler = Some(OneForOneStrategy(5, 5000))
  trapExit = List(classOf[Exception])
  
  // abstract val : the Repository Service
  val storage: AccountRepository

  // message handler
  def receive = {
    case Open(no, name) => 
      storage ! New(Account(no, name, Calendar.getInstance.getTime, None, 100))
    case msg @ Balance(_) => storage forward msg
    case msg @ Post(_, _) => storage forward msg
    case msg @ OpenM(as) => storage forward msg
  }
  
  // shutdown hook
  override def shutdown = { 
    unlink(storage)
    storage.stop
  }
}


The message handler is a standard one that forwards client requests to the repository. Note tha use of the abstract val storage: AccountRepository that helps you defer committing to the concrete implementation class till instantiation of the service object.

AccountServer plays the role of a supervisor for the repository actor. The first 2 lines of code defines the strategy of supervision. OneForOneStrategy says that only the component that has crashed will be restarted. You can make it AllForOne also when the supervising actor will restart all of the actors that it's supervising if one of them crashes. trapExit defines the list of exceptions in the linked actor for which the supervising actor will take an action.

AccountServer is the aupervising actor for the repository. When it is shutdown it has to be unlinked fro the linked actors. This we do in the shutdown hook of the AccountServer.

But how do we link the Repository actor to our domain service actor ? Note that AccountServer is not a concrete object yet. We need to instantiate a concrete implementation of AccountRepository and assign it to storage in AccountServer. And link the two during this instantiation.

We define another trait that resolves the abstract val that we defined in AccountServer and provides a concrete instance of AccountRepository. spawnLink not only starts an instance of Redis based repository implementation, it also links the repository actor with the AccountServer ..

trait RedisAccountRepositoryFactory { this: Actor =>
  val storage: AccountRepository = spawnLink(classOf[RedisAccountRepository]) 
}


Now we have all the components needed to instantiate a fault tolerant domain service object.

object AccountService extends 
  AccountServer with 
  RedisAccountRepositoryFactory {

  // start the service
  override def start: Actor = {
    super.start
    RemoteNode.start("localhost", 9999)
    RemoteNode.register("account:service", this)
    this
  }
}


Have a look at the start method that starts the service on a remote node. We start a remote node and then register the current service under the id "account:service". Any client that needs to use the service can get hold of the service actor by specifying this id .. as in the following snippet ..

class AccountClient(val client: String) { 
  import Actor.Sender.Self
  val service = RemoteClient.actorFor("account:service", "localhost", 9999)

  def open(no: String, name: String) = service ! Open(no, name)
  def balance(no: String): Option[Int] = 
    (service !! Balance(no)).getOrElse(
      throw new Exception("cannot get balance from server"))
  def post(no: String, amount: Int) = service ! Post(no, amount)
  def openMulti(as: List[(String, String)]) = service !!! OpenMulti(as)
}


Services defined using Akka can be made to run on remote nodes without much of an engineering hack. Akka runtime offers APIs for doing that. However, Akka never tries to hide from you the paradigms of distribution. You need to be aware of your distribution requirements and process and supervising hierarchies. Akka facilitates you to define them at the application level, doing the heavy lifting within its underlying implementation. This principle is inspired from Erlang's philosophy and is in sharp contrast to the RPC way of defining APIs. Along with the benefits of message based computation that decouples the sender and the receiver, Akka also enables you to handle states using its built-in STM and pluggable storage engine.

When you model a complex domain, you need to deal with multiple contexts, which Eric calls Bounded Contexts. Within a specific context you have a cohesive model with a set of domain behavior and abstractions. The interpretations of the same abstractions may change when you move to a different context within the same application. As a domain modeler you need to define your context boundaries very carefully using context maps and implement appropriate translation maps between multiple contexts.

Messaging is a great way to implement translation between contexts. You process messages within a context using domain services as above and at the end forward the same message or a translated one to the other contexts of the application. It can be in the form of push or you can also implement a publish/subscribe model between the related contexts. In the latter case, you use Akka-Camel integration that allows actors to send or receive messages through Camel end-points. In either case Akka provides you a world of options to implement loosely coupled domain contexts that form the components of your domain model.

Sunday, March 21, 2010

Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1

Followers of this blog must have known by now that I am a big fan of a clean domain model. And domain driven design, espoused by Eric Evans is the way to go when you are modeling a complex domain and would like to have your model survive for quite some time in the future. Recently I have been experimenting a bit with domain driven design using some amount of asynchronous message passing techniques particularly in the services and the storage layer.

The Repository, as Eric says, is the domain centric abstraction on top of your data storage layer. It gives your model back the feeling that you are dealing with domain concepts instead of marshalling data across your storage layers. Typically you have contracts for repositories at the aggregate root level. The underlying implementation commits to a platform (like JPA) and ensures that your object graph of the aggregate root rests in peace within the relational database. It need not be a relational database - it can be file system, it can be a NoSQL database. That's the power of abstraction that Repositories add to your model.

Ever since I started playing around with Erlang, I have been toying with thoughts of making repositories asynchronous. I blogged some of my thoughts in this post and even implemented a prototype using Scala actors.

Enter Akka and its lightweight actor model that offers transaction support over an STM. Akka offers seamless integration with a variety of persistence engines like Cassandra, MongoDB and Redis. It has plans of adding to this list many of the relational stores as well. The richness of the Akka stack makes for a strong case in designing a beautiful asynchronous repository abstraction.

Consider a very simple domain model for a Bank Account ..

case class Account(no: String, 
  name: String, 
  dateOfOpening: Date, 
  dateOfClose: Option[Date],
  balance: Float)

We can model typical operations on a bank account like Opening a New Account, Querying for the Balance of an Account, Posting an amount in an Account through message dispatch. Typical messages will look like the following in Scala ..

sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent


Note all messages are immutable Scala objects, which will be dispatched by the client, intercepted by a domain service, which can optionally do some processing and validation, and then finally forwarded to the Repository.

In this post we will look at the final stage in the lifecycle of a message, which is how it gets processed by the Repository. In the next post we will integrate the whole along with an abstraction for a domain service. Along the way we will see many of the goodness that Akka transactors offer including support for fault tolerant processing in the event of system crashes.

trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
  lifeCycle = Some(LifeCycle(Permanent))    
  val STORAGE_ID = "account.storage"

  // actual persistent store
  private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

  def receive = {
    case New(a) => 
      atomic {
        accounts.+=(a.no.getBytes, toByteArray[Account](a))
      }

    case Balance(from, no) =>
      val b = atomic { accounts.get(no.getBytes) }
      b match {
        case None => reply(None)
        case Some(a) => 
          val acc = fromByteArray[Account](a).asInstanceOf[Account]
          reply(Some(acc.balance))
      }

      //.. other message handlers
  }

  override def postRestart(reason: Throwable) = {
    accounts = RedisStorage.getMap(STORAGE_ID)  
  }
}


The above snippet implements a message based Repository abstraction with an underlying implementation in Redis. Redis is an advanced key/value store that offers persistence for a suite of data structures like Lists, Sets, Hashes and more. Akka offers transparent persistence to a Redis storage through a common set of abstractions. In the above code you can change RedisStorage.getMap(STORAGE_ID) to CassandraStorage.getMap(..) and switch your underlying storage to Cassandra.

The above Repository works through asynchronous message passing modeled with Akka actors. Here are some of the salient points in the implementation ..

  1. Akka is based on the let-it-crash philosophy. You can design supervisor hierarchies that will be responsible for controlling the lifecycles of your actors. In the Actor abstraction you can configure how you would like to handle a crash. LifeCycle(Permanent) means that the actor will always be restarted by the supervisor in the event of a crash. It can also be Lifecycle(Temporary), which means that it will not be restarted and will be shut down using the shutdown hook that you provide. In our case we make the Repository resilient to crashes.

  2. accounts is the handle to a Map that gets persisted in Redis. Here we store all accounts that the clients open hashed by the account number. Have a look at the New message handler in the implementation.

  3. With Akka you can also provide a restart hook when you repository crashes and gets restarted automatically by the supervisor. postRestart is the hook where we re-initialize the Map structure.

  4. Akka uses multiverse, a Java based STM implementation for transaction handling. In the code mark your transactions using atomic {} and the underlying STM will take care of the rest. Instead of atomic, you can also use monadic for-comprehensions for annotating your transaction blocks. Have a look at Akka documentation for details.


Asynchronous message based implementations decouple the end points, do not block and offer more manageability in distribution of your system. Typical implementations of actor based models are very lightweight. Akka actors take around 600 bytes which means that you can have millions of actors even in a commodity machine. Akka supports various types of message passing semantics which you can use to organize interactions between your collaborating objects.

In the next post we will see how the Repository interacts with Domain Services to implement client request handling. And yeah, you got it right - we will use more of Akka actors.