Sunday, August 02, 2009

MongoDB for Akka Persistence

Actors and message passing have been demonstrated to be great allies in implementing some of the specific use cases of concurrent applications. Message passing concurrency promotes loosely coupled application components, and hence has the natural side-effect of almost infinite scalability. But as Jonas Boner discusses in his JavaOne 2009 presentation, there are many examples in the real world today that have to deal with shared states, transactions and atomicity of operations. Software Transactional Memory provides a viable option towards these use cases, as has been implemented in Clojure and Haskell.

Akka, designed by Jonas Boner, offers Transactors, that combine the benefits of actors and STM, along with a pluggable storage model. It provides a unified set of data structures managed by the STM and backed by a variety of storage engines. It currently supports Cassandra as the storage model out of the box.

Over the weekend I was trying out MongoDB as yet another out of the box persistence options for Akka transactors. MongoDB is a high performance, schema free document oriented database that stores documents in the form of BSON, an enhanced version of JSON. The main storage abstraction is a Collection, which can loosely be equated to a table in a relational database. Besides support for replication, fault tolerance and sharding capabilities, the aspect which makes MongoDB much more easier to use is the rich querying facilities. It supports lots of built-in query capabilities with conditional operators, regular expressions and powerful variants of SQL where clauses on the document model .. Here are some examples of query filters ..

db.myCollection.find( { $where: "this.a > 3" });
db.myCollection.find( { "field" : { $gt: value1, $lt: value2 } } );  // value1 < field < value2

and useful convenience functions ..

db.students.find().limit(10).forEach( ... )  // limit the fetch count
db.students.find().skip(..) // skip some records

In Akka we can have a collection in MongoDB that can be used to store all transacted data keyed on a transaction id. The set of data can be stored in a HashMap as key-value pairs. Have a look at the following diagram for the scheme of data storage using MongoDB Collections ..

Akka TransactionalState offers APIs to publish the appropriate storage engines depending on the configuration ..

class TransactionalState {
  def newPersistentMap(
    config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = 
    config match {
    case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
    case MongoStorageConfig() => new MongoPersistentTransactionalMap

  def newPersistentVector(
    config: PersistentStorageConfig): TransactionalVector[AnyRef] = 
    config match {

  def newPersistentRef(
    config: PersistentStorageConfig): TransactionalRef[AnyRef] = 
    config match {

and each transactional data structure defines the transaction semantics for the underlying structure that it encapsulates. For example, for a PersistentTransactionalMap we have the following APIs ..

abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {

  protected[kernel] val changeSet = new HashMap[K, V]

  def getRange(start: Int, count: Int)

  // ---- For Transactional ----
  override def begin = {}
  override def rollback = changeSet.clear

  //.. additional map semantics .. get, put etc.

A concrete implementation defines the rest of the semantics used to handle transactional data. The concrete implementation is parameterized with the actual storage engine that can be plugged in for specific implementations.

trait ConcretePersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
  val storage: Storage
  override def getRange(start: Int, count: Int) = {
    try {
      storage.getMapStorageRangeFor(uuid, start, count)
    } catch {
      case e: Exception => Nil

  // ---- For Transactional ----
  override def commit = {
    storage.insertMapStorageEntriesFor(uuid, changeSet.toList)

  override def contains(key: String): Boolean = {
    try {
      storage.getMapStorageEntryFor(uuid, key).isDefined
    } catch {
      case e: Exception => false

  //.. others 

Note the use of abstract val in the above implementation that will be concretized when we make a Mongo map ..

class MongoPersistentTransactionalMap 
  extends ConcretePersistentTransactionalMap {
  val storage = MongoStorage

For the Storage part, we have another trait which abstracts the storage specific APIs ..

trait Storage extends Logging {
  def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]])
  def removeMapStorageFor(name: String)
  def getMapStorageEntryFor(name: String, key: String): Option[AnyRef]
  def getMapStorageSizeFor(name: String): Int
  def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]]
  def getMapStorageRangeFor(name: String, start: Int, 
    count: Int): List[Tuple2[String, AnyRef]]

I am in the process of implementing a concrete implementation of storage using MongoDB, which will look like the following ..

object MongoStorage extends Storage {
  val KEY = "key"
  val VALUE = "val"
  val db = new Mongo(..);  // needs to come from configuration
  val COLLECTION = "akka_coll"
  val coll = db.getCollection(COLLECTION)
  private[this] val serializer: Serializer = ScalaJSON
  override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) {
    import java.util.{Map, HashMap}
    val m: Map[String, AnyRef] = new HashMap
    for ((k, v) <- entries) {
      m.put(k, serializer.out(v))
    coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
  override def removeMapStorageFor(name: String) = {
    val q = new BasicDBObject
    q.put(KEY, name)
  //.. others

As the diagram above illustrates, every transaction will have its own DBObject in the Mongo Collection, which will store a HashMap that contains the transacted data set. Using MongoDB's powerful query APIs we can always get to a specific key/value pair for a particular transaction as ..

// form the query object with the transaction id
val q = new BasicDBObject
q.put(KEY, name)

// 1. use the query object to get the DBObject (findOne)
// 2. extract the VALUE which has the HashMap of transacted data set
// 3. query on the HashMap on the passed in key to get the value
// 4. use the scala-json serializer to get back the Scala object
      .get(VALUE).asInstanceOf[JMap[String, AnyRef]]
      .get(key).asInstanceOf[Array[Byte]], None)

MongoDB looks a cool storage engine and has already been used in production as a performant key/value store. It looks promising to be used as the backup storage engine for persistent transactional actors as well. Akka transactors look poised to evolve as a platform that can deliver the goods for stateful STM based as well as stateless message passing based concurrent applications. I plan to complete the implementation in the near future and, if Jonas agrees will be more than willing to contribute to the Akka master.

Open source is as much about contributing, as it is about using ..


Anonymous said...


Andreas Kollegger said...

This is great stuff. With akka-persistence-mongo, it looks like objects are saved as binary data. How difficult would it be to have the option of saving the json string instead?

thetuxracer said...

Is there an way I can CRUD with scala+mongoDB?

Unknown said...

For implementing CRUD operations you need a driver which gives you suitable APIs. Here is one ..