Sunday, October 04, 2009

Pluggable Persistent Transactors with Akka

NoSql is here. Yes, like using multiple programnming languages, we are thinking in terms of using the same paradigm with storage too. And why not? If we can use an alternate language to be more expressive for a specific problem, why not use an alternate form of storage that is a better fit for your requirement?

More and more projects are using alternate forms of storage for persistence of the various forms of data that the application needs to handle. Of course relational databases have their very own place in this stack - the difference is that people today are not being pedantic about their use. And not using the RDBMS as the universal hammer for every nail that they see in the application.

Consider an application that needs durability for transactional data structures. I want to model a transactional banking system, basic debit credit operations, with a message based model. But the operations have to be persistent. The balance needs to be durable and all transactions need to be persisted on the disk. It doesn't matter what structures you store underneath - all I need is some key/value interface that allows me to store the transactions and the balances keyed by the transaction id. I don't even need to bother what form of storage I use at the backend. It can be any database, any key-value store, Terracotta or anything. Will you give me the flexibility to make the storage pluggable? Well, that's a bonus!

Enter Akka .. and its pluggable persistence layer that you can nicely marry to its message passing actor based interface. Consider the following messages for processing debit/credit operations ..


case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: Actor)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
case class Credit(accountNo: String, amount: BigInt)
case object LogSize



In the above messages, the failer actor is used to report fail operations in case the debit fails. Also we want to have all of the above operations as transactional, which we can make declaratively in Akka. Here's the basic actor definition ..

class BankAccountActor extends Actor {
  makeTransactionRequired
  private val accountState = 
    TransactionalState.newPersistentMap(MongoStorageConfig())
  private val txnLog = 
    TransactionalState.newPersistentVector(MongoStorageConfig())
  //..
}



  • makeTransactionRequired makes the actor transactional

  • accountState is a persistent Map that plugs in to a MongoDB based storage, as is evident from the config parameter. In real life application, this will be further abstracted from a configuration file. Earlier I had blogged about the implementation of the MongoDB layer for Akka persistence. accountState offers the key/value interface that will be used by the actor to maintain the durable snapshot of all balances.

  • txnLog is a persistent vector, once again backed up by a MongoDB storage and stores all the transaction logs that occurs in the system


Let us now look at the actor interface that does the message receive and process the debit/credit operations ..

class BankAccountActor extends Actor {
  makeTransactionRequired
  private val accountState = 
    TransactionalState.newPersistentMap(MongoStorageConfig())
  private val txnLog = 
    TransactionalState.newPersistentVector(MongoStorageConfig())

  def receive: PartialFunction[Any, Unit] = {
    // check balance
    case Balance(accountNo) =>
      txnLog.add("Balance:" + accountNo)
      reply(accountState.get(accountNo).get)

    // debit amount: can fail
    case Debit(accountNo, amount, failer) =>
      txnLog.add("Debit:" + accountNo + " " + amount)
      val m: BigInt =
      accountState.get(accountNo) match {
        case None => 0
        case Some(v) => {
          val JsNumber(n) = v.asInstanceOf[JsValue]
          BigInt(n.toString)
        }
      }
      accountState.put(accountNo, (- amount))
      if (amount > m)
        failer !! "Failure"
      reply(- amount)

    //..
  }
}


Here we have the implementation of two messages -

  • Balance reports the current balance and

  • Debit does a debit operation on the balance


Note that the interfaces that these implementations use is in no way dependent on the MongoDB specific APIs. Akka offers a uniform key/value API set across all supported persistent storage. And each of the above pattern matched message processing fragments offer transaction semantics. This is pluggability!

Credit looks very similar to Debit. However, a more interesting use case is the MultiDebit operation that offers a transactional interface. Just like your relational database's ACID semantics, the transactional semantics of Akka offers atomicity over this message. Either the whole MultiDebit will pass or it will be rollbacked.


class BankAccountActor extends Actor {

  //..
  def receive: PartialFunction[Any, Unit] = {

    // many debits: can fail
    // demonstrates true rollback even if multiple puts have been done
    case MultiDebit(accountNo, amounts, failer) =>
      txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(+ _))
      val m: BigInt =
      accountState.get(accountNo) match {
        case None => 0
        case Some(v) => BigInt(v.asInstanceOf[String])
      }
      var bal: BigInt = 0
      amounts.foreach {amount =>
        bal = bal + amount
        accountState.put(accountNo, (- bal))
      }
      if (bal > m) failer !! "Failure"
      reply(- bal)
    
    //..
  }
}



Now that we have the implementation in place, let's look at the test cases that exercise them ..

First a successful debit test case. Note how we have a separate failer actor that reports failure of operations to the caller.

@Test
def testSuccessfulDebit = {
  val bactor = new BankAccountActor
  bactor.start
  val failer = new PersistentFailerActor
  failer.start
  bactor !! Credit("a-123", 5000)
  bactor !! Debit("a-123", 3000, failer)
  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n) = b
  assertEquals(BigInt(2000), BigInt(n.toString))

  bactor !! Credit("a-123", 7000)
  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n1) = b1
  assertEquals(BigInt(9000), BigInt(n1.toString))

  bactor !! Debit("a-123", 8000, failer)
  val b2 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n2) = b2
  assertEquals(BigInt(1000), BigInt(n2.toString))
  assertEquals(7, (bactor !! LogSize).get)
}


And now the interesting MultiDebit that illustrates the transaction rollback semantics ..

@Test
def testUnsuccessfulMultiDebit = {
  val bactor = new BankAccountActor
  bactor.start
  bactor !! Credit("a-123", 5000)
  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n) = b
  assertEquals(BigInt(5000), BigInt(n.toString))

  val failer = new PersistentFailerActor
  failer.start
  try {
    bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
    fail("should throw exception")
  } catch { case e: RuntimeException => {}}

  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n1) = b1
  assertEquals(BigInt(5000), BigInt(n1.toString))

  // should not count the failed one
  assertEquals(3, (bactor !! LogSize).get)
}


In the snippet above, the balance remains at 5000 when the debit fails while processing the final amount of the list passed to MultiDebit message.

Relational database will always remain for the use case that it serves the best - persistence of data that needs a true relational model. NoSQL is gradually making its place in the application stack for the complementary set of use cases that need a much loosely coupled model, a key/value database or a document oriented database. Apart from easier manageability, another big advantage using these databases is that they do not need big ceremonious ORM layers between the application model and the data model. This is because what you see is what you store (WYSIWYS), there is no paradigm mismatch that needs to be bridged.

2 comments:

Daniel said...

Interesting. But why isn't

case Some(v) => {
val JsNumber(n) = v.asInstanceOf[JsValue]
BigInt(n.toString)
}

reduced to

case Some(JsNumber(n)) =>
BigInt(n.toString)

?

masseyis said...

Just stumbled across this while looking at Akka for STM. It looks like the hidden magic you're not showing here is that your 'failer' Actor is throwing a RuntimeException and that's what rolls back the transaction. Is that right? That doesn't seem very scala-ish. Isn't there a more explicit commit/rollback structure?