Actors and message passing have been demonstrated to be great allies in implementing some of the specific use cases of concurrent applications. Message passing concurrency promotes loosely coupled application components, and hence has the natural side-effect of almost infinite scalability. But as Jonas Boner discusses in his JavaOne 2009
presentation, there are many examples in the real world today that have to deal with shared states, transactions and atomicity of operations. Software Transactional Memory provides a viable option towards these use cases, as has been implemented in Clojure and Haskell.
Akka, designed by
Jonas Boner, offers
Transactors, that combine the benefits of actors and STM, along with a pluggable storage model. It provides a unified set of data structures managed by the STM and backed by a variety of storage engines. It currently supports
Cassandra as the storage model out of the box.
Over the weekend I was trying out
MongoDB as yet another out of the box persistence options for Akka transactors. MongoDB is a high performance, schema free document oriented database that stores documents in the form of BSON, an enhanced version of JSON. The main storage abstraction is a
Collection, which can loosely be equated to a table in a relational database. Besides support for replication, fault tolerance and sharding capabilities, the aspect which makes MongoDB much more easier to use is the rich querying facilities. It supports lots of built-in
query capabilities with conditional operators, regular expressions and powerful variants of SQL where clauses on the document model .. Here are some examples of query filters ..
db.myCollection.find( { $where: "this.a > 3" });
db.myCollection.find( { "field" : { $gt: value1, $lt: value2 } } );
and useful convenience functions ..
db.students.find().limit(10).forEach( ... )
db.students.find().skip(..)
In Akka we can have a collection in MongoDB that can be used to store all transacted data keyed on a transaction id. The set of data can be stored in a
HashMap
as key-value pairs. Have a look at the following diagram for the scheme of data storage using MongoDB Collections ..
Akka
TransactionalState
offers APIs to publish the appropriate storage engines depending on the configuration ..
class TransactionalState {
def newPersistentMap(
config: PersistentStorageConfig): TransactionalMap[String, AnyRef] =
config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case MongoStorageConfig() => new MongoPersistentTransactionalMap
}
def newPersistentVector(
config: PersistentStorageConfig): TransactionalVector[AnyRef] =
config match {
}
def newPersistentRef(
config: PersistentStorageConfig): TransactionalRef[AnyRef] =
config match {
}
}
and each transactional data structure defines the transaction semantics for the underlying structure that it encapsulates. For example, for a
PersistentTransactionalMap
we have the following APIs ..
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[kernel] val changeSet = new HashMap[K, V]
def getRange(start: Int, count: Int)
override def begin = {}
override def rollback = changeSet.clear
}
A concrete implementation defines the rest of the semantics used to handle transactional data. The concrete implementation is parameterized with the actual storage engine that can be plugged in for specific implementations.
trait ConcretePersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
val storage: Storage
override def getRange(start: Int, count: Int) = {
verifyTransaction
try {
storage.getMapStorageRangeFor(uuid, start, count)
} catch {
case e: Exception => Nil
}
}
override def commit = {
storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
override def contains(key: String): Boolean = {
try {
verifyTransaction
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
}
}
Note the use of
abstract val
in the above implementation that will be concretized when we make a Mongo map ..
class MongoPersistentTransactionalMap
extends ConcretePersistentTransactionalMap {
val storage = MongoStorage
}
For the Storage part, we have another trait which abstracts the storage specific APIs ..
trait Storage extends Logging {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]])
def removeMapStorageFor(name: String)
def getMapStorageEntryFor(name: String, key: String): Option[AnyRef]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]]
def getMapStorageRangeFor(name: String, start: Int,
count: Int): List[Tuple2[String, AnyRef]]
}
I am in the process of implementing a concrete implementation of storage using MongoDB, which will look like the following ..
object MongoStorage extends Storage {
val KEY = "key"
val VALUE = "val"
val db = new Mongo(..);
val COLLECTION = "akka_coll"
val coll = db.getCollection(COLLECTION)
private[this] val serializer: Serializer = ScalaJSON
override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[String, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
}
override def removeMapStorageFor(name: String) = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
}
As the diagram above illustrates, every transaction will have its own
DBObject
in the Mongo Collection, which will store a
HashMap
that contains the transacted data set. Using MongoDB's powerful query APIs we can always get to a specific key/value pair for a particular transaction as ..
val q = new BasicDBObject
q.put(KEY, name)
serializer.in(
coll.findOne(q)
.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]], None)
MongoDB looks a cool storage engine and has already been used in production as a performant key/value store. It looks promising to be used as the backup storage engine for persistent transactional actors as well. Akka transactors look poised to evolve as a platform that can deliver the goods for stateful STM based as well as stateless message passing based concurrent applications. I plan to complete the implementation in the near future and, if Jonas agrees will be more than willing to contribute to the Akka master.
Open source is as much about contributing, as it is about using ..