## Sunday, October 04, 2009

### Pluggable Persistent Transactors with Akka

NoSql is here. Yes, like using multiple programnming languages, we are thinking in terms of using the same paradigm with storage too. And why not? If we can use an alternate language to be more expressive for a specific problem, why not use an alternate form of storage that is a better fit for your requirement?

More and more projects are using alternate forms of storage for persistence of the various forms of data that the application needs to handle. Of course relational databases have their very own place in this stack - the difference is that people today are not being pedantic about their use. And not using the RDBMS as the universal hammer for every nail that they see in the application.

Consider an application that needs durability for transactional data structures. I want to model a transactional banking system, basic debit credit operations, with a message based model. But the operations have to be persistent. The balance needs to be durable and all transactions need to be persisted on the disk. It doesn't matter what structures you store underneath - all I need is some key/value interface that allows me to store the transactions and the balances keyed by the transaction id. I don't even need to bother what form of storage I use at the backend. It can be any database, any key-value store, Terracotta or anything. Will you give me the flexibility to make the storage pluggable? Well, that's a bonus!

Enter Akka .. and its pluggable persistence layer that you can nicely marry to its message passing actor based interface. Consider the following messages for processing debit/credit operations ..

case class Balance(accountNo: String)case class Debit(accountNo: String, amount: BigInt, failer: Actor)case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)case class Credit(accountNo: String, amount: BigInt)case object LogSize

In the above messages, the failer actor is used to report fail operations in case the debit fails. Also we want to have all of the above operations as transactional, which we can make declaratively in Akka. Here's the basic actor definition ..

class BankAccountActor extends Actor {  makeTransactionRequired  private val accountState =     TransactionalState.newPersistentMap(MongoStorageConfig())  private val txnLog =     TransactionalState.newPersistentVector(MongoStorageConfig())  //..}

• makeTransactionRequired makes the actor transactional

• accountState is a persistent Map that plugs in to a MongoDB based storage, as is evident from the config parameter. In real life application, this will be further abstracted from a configuration file. Earlier I had blogged about the implementation of the MongoDB layer for Akka persistence. accountState offers the key/value interface that will be used by the actor to maintain the durable snapshot of all balances.

• txnLog is a persistent vector, once again backed up by a MongoDB storage and stores all the transaction logs that occurs in the system

Let us now look at the actor interface that does the message receive and process the debit/credit operations ..

class BankAccountActor extends Actor {  makeTransactionRequired  private val accountState =     TransactionalState.newPersistentMap(MongoStorageConfig())  private val txnLog =     TransactionalState.newPersistentVector(MongoStorageConfig())  def receive: PartialFunction[Any, Unit] = {    // check balance    case Balance(accountNo) =>      txnLog.add("Balance:" + accountNo)      reply(accountState.get(accountNo).get)    // debit amount: can fail    case Debit(accountNo, amount, failer) =>      txnLog.add("Debit:" + accountNo + " " + amount)      val m: BigInt =      accountState.get(accountNo) match {        case None => 0        case Some(v) => {          val JsNumber(n) = v.asInstanceOf[JsValue]          BigInt(n.toString)        }      }      accountState.put(accountNo, (m - amount))      if (amount > m)        failer !! "Failure"      reply(m - amount)    //..  }}
Here we have the implementation of two messages -

• Balance reports the current balance and

• Debit does a debit operation on the balance

Note that the interfaces that these implementations use is in no way dependent on the MongoDB specific APIs. Akka offers a uniform key/value API set across all supported persistent storage. And each of the above pattern matched message processing fragments offer transaction semantics. This is pluggability!

Credit looks very similar to Debit. However, a more interesting use case is the MultiDebit operation that offers a transactional interface. Just like your relational database's ACID semantics, the transactional semantics of Akka offers atomicity over this message. Either the whole MultiDebit will pass or it will be rollbacked.

class BankAccountActor extends Actor {  //..  def receive: PartialFunction[Any, Unit] = {    // many debits: can fail    // demonstrates true rollback even if multiple puts have been done    case MultiDebit(accountNo, amounts, failer) =>      txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))      val m: BigInt =      accountState.get(accountNo) match {        case None => 0        case Some(v) => BigInt(v.asInstanceOf[String])      }      var bal: BigInt = 0      amounts.foreach {amount =>        bal = bal + amount        accountState.put(accountNo, (m - bal))      }      if (bal > m) failer !! "Failure"      reply(m - bal)        //..  }}

Now that we have the implementation in place, let's look at the test cases that exercise them ..

First a successful debit test case. Note how we have a separate failer actor that reports failure of operations to the caller.

@Testdef testSuccessfulDebit = {  val bactor = new BankAccountActor  bactor.start  val failer = new PersistentFailerActor  failer.start  bactor !! Credit("a-123", 5000)  bactor !! Debit("a-123", 3000, failer)  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]  val JsNumber(n) = b  assertEquals(BigInt(2000), BigInt(n.toString))  bactor !! Credit("a-123", 7000)  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]  val JsNumber(n1) = b1  assertEquals(BigInt(9000), BigInt(n1.toString))  bactor !! Debit("a-123", 8000, failer)  val b2 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]  val JsNumber(n2) = b2  assertEquals(BigInt(1000), BigInt(n2.toString))  assertEquals(7, (bactor !! LogSize).get)}
And now the interesting MultiDebit that illustrates the transaction rollback semantics ..

@Testdef testUnsuccessfulMultiDebit = {  val bactor = new BankAccountActor  bactor.start  bactor !! Credit("a-123", 5000)  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]  val JsNumber(n) = b  assertEquals(BigInt(5000), BigInt(n.toString))  val failer = new PersistentFailerActor  failer.start  try {    bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)    fail("should throw exception")  } catch { case e: RuntimeException => {}}  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]  val JsNumber(n1) = b1  assertEquals(BigInt(5000), BigInt(n1.toString))  // should not count the failed one  assertEquals(3, (bactor !! LogSize).get)}
In the snippet above, the balance remains at 5000 when the debit fails while processing the final amount of the list passed to MultiDebit message.

Relational database will always remain for the use case that it serves the best - persistence of data that needs a true relational model. NoSQL is gradually making its place in the application stack for the complementary set of use cases that need a much loosely coupled model, a key/value database or a document oriented database. Apart from easier manageability, another big advantage using these databases is that they do not need big ceremonious ORM layers between the application model and the data model. This is because what you see is what you store (WYSIWYS), there is no paradigm mismatch that needs to be bridged.

Daniel said...

Interesting. But why isn't

case Some(v) => {
val JsNumber(n) = v.asInstanceOf[JsValue]
BigInt(n.toString)
}

reduced to

case Some(JsNumber(n)) =>
BigInt(n.toString)

?

masseyis said...

Just stumbled across this while looking at Akka for STM. It looks like the hidden magic you're not showing here is that your 'failer' Actor is throwing a RuntimeException and that's what rolls back the transaction. Is that right? That doesn't seem very scala-ish. Isn't there a more explicit commit/rollback structure?