Akka, designed by Jonas Boner, offers Transactors, that combine the benefits of actors and STM, along with a pluggable storage model. It provides a unified set of data structures managed by the STM and backed by a variety of storage engines. It currently supports Cassandra as the storage model out of the box.
Over the weekend I was trying out MongoDB as yet another out of the box persistence options for Akka transactors. MongoDB is a high performance, schema free document oriented database that stores documents in the form of BSON, an enhanced version of JSON. The main storage abstraction is a Collection, which can loosely be equated to a table in a relational database. Besides support for replication, fault tolerance and sharding capabilities, the aspect which makes MongoDB much more easier to use is the rich querying facilities. It supports lots of built-in query capabilities with conditional operators, regular expressions and powerful variants of SQL where clauses on the document model .. Here are some examples of query filters ..
db.myCollection.find( { $where: "this.a > 3" });
db.myCollection.find( { "field" : { $gt: value1, $lt: value2 } } ); // value1 < field < value2
and useful convenience functions ..
db.students.find().limit(10).forEach( ... ) // limit the fetch count
db.students.find().skip(..) // skip some records
In Akka we can have a collection in MongoDB that can be used to store all transacted data keyed on a transaction id. The set of data can be stored in a
HashMap
as key-value pairs. Have a look at the following diagram for the scheme of data storage using MongoDB Collections ..Akka
TransactionalState
offers APIs to publish the appropriate storage engines depending on the configuration ..class TransactionalState {
def newPersistentMap(
config: PersistentStorageConfig): TransactionalMap[String, AnyRef] =
config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case MongoStorageConfig() => new MongoPersistentTransactionalMap
}
def newPersistentVector(
config: PersistentStorageConfig): TransactionalVector[AnyRef] =
config match {
//..
}
def newPersistentRef(
config: PersistentStorageConfig): TransactionalRef[AnyRef] =
config match {
//..
}
//..
}
and each transactional data structure defines the transaction semantics for the underlying structure that it encapsulates. For example, for a
PersistentTransactionalMap
we have the following APIs ..abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[kernel] val changeSet = new HashMap[K, V]
def getRange(start: Int, count: Int)
// ---- For Transactional ----
override def begin = {}
override def rollback = changeSet.clear
//.. additional map semantics .. get, put etc.
}
A concrete implementation defines the rest of the semantics used to handle transactional data. The concrete implementation is parameterized with the actual storage engine that can be plugged in for specific implementations.
trait ConcretePersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
val storage: Storage
override def getRange(start: Int, count: Int) = {
verifyTransaction
try {
storage.getMapStorageRangeFor(uuid, start, count)
} catch {
case e: Exception => Nil
}
}
// ---- For Transactional ----
override def commit = {
storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
override def contains(key: String): Boolean = {
try {
verifyTransaction
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
}
//.. others
}
Note the use of
abstract val
in the above implementation that will be concretized when we make a Mongo map ..class MongoPersistentTransactionalMap
extends ConcretePersistentTransactionalMap {
val storage = MongoStorage
}
For the Storage part, we have another trait which abstracts the storage specific APIs ..
trait Storage extends Logging {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]])
def removeMapStorageFor(name: String)
def getMapStorageEntryFor(name: String, key: String): Option[AnyRef]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]]
def getMapStorageRangeFor(name: String, start: Int,
count: Int): List[Tuple2[String, AnyRef]]
}
I am in the process of implementing a concrete implementation of storage using MongoDB, which will look like the following ..
object MongoStorage extends Storage {
val KEY = "key"
val VALUE = "val"
val db = new Mongo(..); // needs to come from configuration
val COLLECTION = "akka_coll"
val coll = db.getCollection(COLLECTION)
private[this] val serializer: Serializer = ScalaJSON
override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[String, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
}
override def removeMapStorageFor(name: String) = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
//.. others
}
As the diagram above illustrates, every transaction will have its own
DBObject
in the Mongo Collection, which will store a HashMap
that contains the transacted data set. Using MongoDB's powerful query APIs we can always get to a specific key/value pair for a particular transaction as ..// form the query object with the transaction id
val q = new BasicDBObject
q.put(KEY, name)
// 1. use the query object to get the DBObject (findOne)
// 2. extract the VALUE which has the HashMap of transacted data set
// 3. query on the HashMap on the passed in key to get the value
// 4. use the scala-json serializer to get back the Scala object
serializer.in(
coll.findOne(q)
.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]], None)
MongoDB looks a cool storage engine and has already been used in production as a performant key/value store. It looks promising to be used as the backup storage engine for persistent transactional actors as well. Akka transactors look poised to evolve as a platform that can deliver the goods for stateful STM based as well as stateless message passing based concurrent applications. I plan to complete the implementation in the near future and, if Jonas agrees will be more than willing to contribute to the Akka master.
Open source is as much about contributing, as it is about using ..
4 comments:
nice
This is great stuff. With akka-persistence-mongo, it looks like objects are saved as binary data. How difficult would it be to have the option of saving the json string instead?
Is there an way I can CRUD with scala+mongoDB?
For implementing CRUD operations you need a driver which gives you suitable APIs. Here is one .. http://novus.github.com/docs/casbah/sphinx/html/
Post a Comment