Wednesday, August 27, 2008

Actors in the Service Layer - Asynchronous and Concurrent

Exploit the virtues of immutability. Design your application around stateless abstractions interacting with each other through asynchronous message passing. These are some of the mantras that I have been trying to grok recently. In a typical Java EE application, we design the service layer to be maximally stateless. What this means is that each individual service has localized mutability interacting with other services on a shared-nothing basis. Asynchronous message passing offers some interesting avenues towards scaling up the throughput of such service layers in an application.

Imagine the service layer of your domain model has the following API ..

class SettlementService {
  public List<Completion> processCompletion(List<Settlement> settlements) {
    List<Completion> completions = new ArrayList<Completion>();
    for(Settlement s : settlements) {
      // completion logic : complex
    return completions;

The method processCompletion() takes a collection of Settlement objects and for each of them does pretty complex business logic processing before returning the collection of completed settlements. We are talking about settlement of trades as part of back office processing logic of financial services.

The API was fine and worked perfectly during all demo sessions and the prototypes that we did. The client was happy that all of his settlements were being processed correctly and within perfectly acceptable limits of latency.

One day the stock exchange had a great day, the traders were happy and the market saw a phenomenal upsurge of trading volumes. Accordingly our back-office solution also received part of the sunshine and all numbers started moving up. Lots of trades to process, lots of settlements to complete, and this is exactly when latency reared it's ugly head. A detailed profiling revealed the bottleneck in the call of processCompletion() trying to complete lots of settlements synchronously and the entire user experience sucked big time.

Going Asynchronous ..

Asynchronous APIs scale, asynchronous interactions can be parallelized, asynchronous communication model encourages loose coupling between the producer and the consumer entities. Elsewhere I have documented usage of actor based modeling for asynchronous processing. Actors provide natural asynchrony, and Erlang is a great testimony to the fact that asynchronous message passing based systems scale easily, offer better approaches towards distribution, reliability and fault tolerance. And if you can parallelize your business process, then you can allocate each isolation unit to a separate actor that can run concurrently over all the cores of your system. Though this is only feasible for languages or runtime that can create processes on the cheap. Erlang does it with green threads, Scala does it through some form of continuations on the JVM.

With Scala Actors ..

In the above example, processing of every settlement completion for a unique combination of client account and security can be considered as an isolated task that can be safely allocated to an actor. The following snippet is a transformation of the above code that incorporates asynchrony using Scala actors. For brevity and simplicity of code, I have ignored the business constraint of uniqueness depending on client account and security, and allocated every completion processing to a separate actor.

class SettlementService {

  case class Settlement
  case class Completion(s: Settlement)

  def processCompletion(settlements: List[Settlement]) = {
    val buffer = new Array[Completion](settlements.length)
    val cs =
      for(idx <- (0 until settlements.length).toList) yield {
        scala.actors.Futures.future {
          buffer(idx) = doComplete(settlements(idx))

  def doComplete(s: Settlement): Completion = {
    // actual completion logic for a settlement
    new Completion(s)

The above code snippet uses Scala's future method that invokes the actor behind the scenes ..

def future[T](body: => T): Future[T] = {
  case object Eval
  val a = {
    Actor.react {
      case Eval => Actor.reply(body)
  a !! (Eval, { case any => any.asInstanceOf[T] })

The actors are scheduled asynchronously in parallel and block the underlying thread only during the time slice when a received Scala message matches the pattern specified in the partial function that forms the react block. Scala actors are not implemented on a thread-per-actor model - hence invoking actors are way cheaper than starting JVM threads. Scala actors are threadless, event based and can be forked in thousands on a commodity machine.

What the actor model offers ..

The actor model is all about immutability and a shared-nothing paradigm and encourages a programming style where you can think of modeling your interactions in terms of immutable messages. Service layers of an application are always meant to be stateless, and the actor model makes you think more deeply on this aspect. And once you have statelessness you can achieve concurrency by distributing the stateless components amongst the actors.

Making your Infrastructure Stack Lighter ..

In many cases, asynchronous messaging libraries also help getting rid of additional heavyweight infrastructures from the application stack. In one of our applications, we were using JMS to handle priority messages. Scala and Erlang both support prioritizing messages through timeouts in the receive loop. Consider a scenario from the above application domain, where the system receives Trade messages from all over the places, that need to be processed in the back-office solution before it can be forwarded to the Settlement component. And the business rule mandates that trades for FixedIncome type of securities need to have higher priority in processing than those for Equity instruments. We can have this requirement modeled using the following Scala snippet of actor code (simplified for brevity) ..

trait Instrument
case class Equity(id: Int) extends Instrument
case class FixedIncome(id: Int) extends Instrument

case class Trade(security: Instrument)

val sx = actor {
  loop {
    reactWithin(0) {
      case Trade(i: FixedIncome) => //.. process fixed income trade
      case TIMEOUT =>
        react {
          case Trade(i: Equity) => //.. process equity trade

With a timeout value of 0 in reactWithin, the function first removes all FixedIncome trade messages from the mailbox before entering the inner react loop. Hence Equity trade messages will be processed only when there are no pending FixedIncome trade messages in the mailbox of the actor.

Asynchronous messaging is here ..

People are talking about it, open source implementations of messaging protocols like AMQP and XMPP are also available now. Erlang has demonstrated how to design and implement fault tolerant, distributed systems using the shared nothing, immutable message based programming model. Scala has started imbibing many of the goodness from Erlang/OTP platforms. And Scala runs on the JVM - it is far too natural that I have been thinking of replacing most of my synchronous interfaces at the service layer with Scala actors. Recently I have been experimenting with Erlang based RabbitMQ clusters as the messaging middleware, and got the application layer to scale pretty well with Scala actors.

And Servlet 3.0 ..

The Web layer is also getting async support from the upcoming JSR 315 and Servlet 3.0 spec about to hit the ground. Thanks to some great stuff from Greg Wilkins of Webtide, async servlets will allow applications to suspend and resume request processing and enable and disable the response - a direct support for writing comet style applications. Till date we have been using Jetty continuations and Scala's event based actors for asynchronous processing .. Web applications are definitely going to get a big scaling boost with support for asynchronous servlets.

Friday, August 22, 2008

Facebook Scaling Out across Data Centers

Jason Sobel has an interesting post on scaling out of Facebook on to a new data center in the East Coast at Virginia. A really interesting insight into some of the design decisions that have given us one of the most trafficked sites on the face of the planet today.

Here are two points that struck me on reading the post ..

  • Changing the sql grammar in the replication stream to incorporate eviction of expired items from memcached looks like a hack. A more traditional implementation could have been using triggers or MySQL UDFs to atomicize the entire transaction. But generic solutions always come ironclad with some performance overhead. It's no wonder that Facebook needs to do all specializations, even if that amounts to no ceremony and all hack.

  • Just wondering that Facebook still writes in one data center. With all the CAP theorem and eventual consistency stuff being solved by Amazon, why does Facebook still have this limitation ?

Monday, August 18, 2008

Concurrency Oriented Programming and Side Effects

In my last post on Scala actors, I had mentioned about the actor code being side-effect-free and referentially transparent. James Iry correctly pointed out that Scala react is side-effected, since the partial function that it takes processes a message which is neither a parameter to react nor a value in the lexical scope.

Sure! I should have been more careful to articulate my thoughts. The side-effect that react induces can be a problem if the messages that it processes are not immutable, do share mutable state either amongst themselves or with the actor. In fact concurrency oriented programming is all about side-effects, the better models provide more manageable semantics to abstract them away from the client programmers. Abstracting out the concurrency oriented parts of a large software system is one of the biggest challenges that the industry has been trying to solve for years. And this is where asynchronous message passing model shines, and modules like gen_server of Erlang/OTP provides convenience and correctness. The bottomline is that we can avoid unwanted side-effects and difficult to debug concurrency issues if we keep all messaages immutable without any sharing of mutable state. Thanks James for correcting the thought!

In both Scala and Erlang, the underlying actor model has to deal with concurrency explicitly, manage synchronization of actor mailboxes and deal with issues of message ordering and potential dead- or live-lock problems. If we were to write the threaded versions of the actor code ourselves, we would need to manage the stateful mailboxes of individual actors as blocking queues. With Scala's actor model, this pattern is subsumed within the implementation, thereby ensuring racefree communication between concurrent actors.

Once we play to the rules of the game, we need not have to bother about the side-effect that react induces.

Monday, August 11, 2008

Asynchronous, Functional and automatically Concurrent

The following code fragment is from an earlier post on using Scala actors and AMQP. I thought I would bring this snippet up once again to highlight some of the goodness that functional Scala offers in modeling actor model of concurrent computation.

import scala.actors.Actor

case class Trade(id: Int, security: String, principal: Int, commission: Int)
case class TradeMessage(message: Trade)
case class AddListener(a: Actor)

class TradingService extends Actor {

  def act = loop(Nil)

  def loop(traders: List[Actor]) {
    react {
    case AddListener(a) => loop(:: traders)
    case msg@TradeMessage(t) => traders.foreach(! msg); loop(traders)
    case _ => loop(traders)

An implementation of the Observer design pattern using message passing. Interested traders can register as observers and observe every trade that takes place. But without any mutable state for maintaining the list of observers. Not a very familiar paradigm to the programmers of an imperative language. The trick is to have the list of observers as an argument to the loop() function which is tail called.

Nice .. asynchronous, referentially transparent, side-effect-free functional code. No mutable state, no need of explicit synchronization, no fear of race conditions or deadlocks, since no shared data are being processed concurrently by multiple threads of execution.

Monday, August 04, 2008

Erlang as middleware

Delicious is also using Erlang. Well, that's yet another addition to the Erlangy list of Facebook, SimpleDB, CouchDB, Twitter and many more. All these applications/services rely on the intrinsic scalability of Erlang as a platform. RabbitMQ provides an implementation of AMQP based on Erlang, ejabberd, an XMPP implementation is also Erlang based. EngineYard is also betting on Erlang for Vertebrae, its platform for Cloud Computing. It's like Erlang is carving out it's own niche as the dominant choice of service based backends.

I can make my application scale using distributed hashmap technologies of memcached or in-process JVM clustering techniques of Terracotta or a host of other techniques that treat distribution and scalability as a concern separate from the core application design. But with Erlang/OTP, I start with shared nothing concurrency oriented process design, which can naturally be distributed across the cores of your deployment server. What is a module in the codebase can be made to map to a process in the runtime, instances of which can be distributed transparently to the nodes of your cluster.

Why Erlang ?

Erlang is naturally concurrent, with ultralightweight processes based on green threads that can be spawned in millions on a cluster of commodity hardware. As a functional language, Erlang applications are designed as shared nothing architectures that interact with each other through asynchronous message passing primitives - as if the whole code can be mathematically analyzed. This is unlike an imperative language runtime that offers shared state concurrency through threads and mutexes. Erlang runtime offers dynamic hotswapping of code, you can change code on-the-fly, converting your application to a non stop system. Finally Erlang processes can be organized into supervisor hierarchies that manage the lifetimes of their child processes and automatically restart in case of exceptions and failures. And almost all of these come out of the box through the goodness of platforms like OTP.

But do we find enough Erlang programmers ? Well, the syntax .. umm ..

Isn't your OS mainstream ?

Then why don't you find chores of developers programming with the kernel APIs that your OS publishes ? The OS offers the service which developers use everyday when they open up a host of windows, manage their filesystems, send out an IM or open up the browser to get the latest quotes on their tickers. And all these, being completely oblivious of how the kernel handles scheduling of native threads to serve up your long running requests.

Erlang is becoming mainstream in the same context.

I do not need to know a bit of Erlang to design my trading service that can process millions of messages from my AMQP endpoints in real time. In fact while prototyping for the next version of a trading back-office system, I cooked up all my services using Scala actors, that happily could use RabbitMQ's Erlang based queue and exchange implementation through well-published Java APIs of the client.

I can still architect scalable websites that need not poll Flickr 3 million times a day to fetch 6000 updates, without an iota of Erlang awareness. The technology is called XMPP application server, which scaffolds all the Erlang machinery underneath while exposing easy-to-use client interfaces in your favorite programming language ..

Erlang is becoming mainstream as a middleware service provider. And, one could feel the buzz in OSCON 2008.