Wednesday, August 27, 2008

Actors in the Service Layer - Asynchronous and Concurrent

Exploit the virtues of immutability. Design your application around stateless abstractions interacting with each other through asynchronous message passing. These are some of the mantras that I have been trying to grok recently. In a typical Java EE application, we design the service layer to be maximally stateless. What this means is that each individual service has localized mutability interacting with other services on a shared-nothing basis. Asynchronous message passing offers some interesting avenues towards scaling up the throughput of such service layers in an application.

Imagine the service layer of your domain model has the following API ..

class SettlementService {
  public List<Completion> processCompletion(List<Settlement> settlements) {
    List<Completion> completions = new ArrayList<Completion>();
    for(Settlement s : settlements) {
      // completion logic : complex
    return completions;

The method processCompletion() takes a collection of Settlement objects and for each of them does pretty complex business logic processing before returning the collection of completed settlements. We are talking about settlement of trades as part of back office processing logic of financial services.

The API was fine and worked perfectly during all demo sessions and the prototypes that we did. The client was happy that all of his settlements were being processed correctly and within perfectly acceptable limits of latency.

One day the stock exchange had a great day, the traders were happy and the market saw a phenomenal upsurge of trading volumes. Accordingly our back-office solution also received part of the sunshine and all numbers started moving up. Lots of trades to process, lots of settlements to complete, and this is exactly when latency reared it's ugly head. A detailed profiling revealed the bottleneck in the call of processCompletion() trying to complete lots of settlements synchronously and the entire user experience sucked big time.

Going Asynchronous ..

Asynchronous APIs scale, asynchronous interactions can be parallelized, asynchronous communication model encourages loose coupling between the producer and the consumer entities. Elsewhere I have documented usage of actor based modeling for asynchronous processing. Actors provide natural asynchrony, and Erlang is a great testimony to the fact that asynchronous message passing based systems scale easily, offer better approaches towards distribution, reliability and fault tolerance. And if you can parallelize your business process, then you can allocate each isolation unit to a separate actor that can run concurrently over all the cores of your system. Though this is only feasible for languages or runtime that can create processes on the cheap. Erlang does it with green threads, Scala does it through some form of continuations on the JVM.

With Scala Actors ..

In the above example, processing of every settlement completion for a unique combination of client account and security can be considered as an isolated task that can be safely allocated to an actor. The following snippet is a transformation of the above code that incorporates asynchrony using Scala actors. For brevity and simplicity of code, I have ignored the business constraint of uniqueness depending on client account and security, and allocated every completion processing to a separate actor.

class SettlementService {

  case class Settlement
  case class Completion(s: Settlement)

  def processCompletion(settlements: List[Settlement]) = {
    val buffer = new Array[Completion](settlements.length)
    val cs =
      for(idx <- (0 until settlements.length).toList) yield {
        scala.actors.Futures.future {
          buffer(idx) = doComplete(settlements(idx))

  def doComplete(s: Settlement): Completion = {
    // actual completion logic for a settlement
    new Completion(s)

The above code snippet uses Scala's future method that invokes the actor behind the scenes ..

def future[T](body: => T): Future[T] = {
  case object Eval
  val a = {
    Actor.react {
      case Eval => Actor.reply(body)
  a !! (Eval, { case any => any.asInstanceOf[T] })

The actors are scheduled asynchronously in parallel and block the underlying thread only during the time slice when a received Scala message matches the pattern specified in the partial function that forms the react block. Scala actors are not implemented on a thread-per-actor model - hence invoking actors are way cheaper than starting JVM threads. Scala actors are threadless, event based and can be forked in thousands on a commodity machine.

What the actor model offers ..

The actor model is all about immutability and a shared-nothing paradigm and encourages a programming style where you can think of modeling your interactions in terms of immutable messages. Service layers of an application are always meant to be stateless, and the actor model makes you think more deeply on this aspect. And once you have statelessness you can achieve concurrency by distributing the stateless components amongst the actors.

Making your Infrastructure Stack Lighter ..

In many cases, asynchronous messaging libraries also help getting rid of additional heavyweight infrastructures from the application stack. In one of our applications, we were using JMS to handle priority messages. Scala and Erlang both support prioritizing messages through timeouts in the receive loop. Consider a scenario from the above application domain, where the system receives Trade messages from all over the places, that need to be processed in the back-office solution before it can be forwarded to the Settlement component. And the business rule mandates that trades for FixedIncome type of securities need to have higher priority in processing than those for Equity instruments. We can have this requirement modeled using the following Scala snippet of actor code (simplified for brevity) ..

trait Instrument
case class Equity(id: Int) extends Instrument
case class FixedIncome(id: Int) extends Instrument

case class Trade(security: Instrument)

val sx = actor {
  loop {
    reactWithin(0) {
      case Trade(i: FixedIncome) => //.. process fixed income trade
      case TIMEOUT =>
        react {
          case Trade(i: Equity) => //.. process equity trade

With a timeout value of 0 in reactWithin, the function first removes all FixedIncome trade messages from the mailbox before entering the inner react loop. Hence Equity trade messages will be processed only when there are no pending FixedIncome trade messages in the mailbox of the actor.

Asynchronous messaging is here ..

People are talking about it, open source implementations of messaging protocols like AMQP and XMPP are also available now. Erlang has demonstrated how to design and implement fault tolerant, distributed systems using the shared nothing, immutable message based programming model. Scala has started imbibing many of the goodness from Erlang/OTP platforms. And Scala runs on the JVM - it is far too natural that I have been thinking of replacing most of my synchronous interfaces at the service layer with Scala actors. Recently I have been experimenting with Erlang based RabbitMQ clusters as the messaging middleware, and got the application layer to scale pretty well with Scala actors.

And Servlet 3.0 ..

The Web layer is also getting async support from the upcoming JSR 315 and Servlet 3.0 spec about to hit the ground. Thanks to some great stuff from Greg Wilkins of Webtide, async servlets will allow applications to suspend and resume request processing and enable and disable the response - a direct support for writing comet style applications. Till date we have been using Jetty continuations and Scala's event based actors for asynchronous processing .. Web applications are definitely going to get a big scaling boost with support for asynchronous servlets.

No comments: