Sunday, March 15, 2009

Real world event based solutions using MINA and Scala Actors

In the QCon London track, Architectures in Financial Applications, Oracle presented an interesting solution that seeks to transform the reconciliation process from an inherently batch process to an incremental event driven asynchronous one. They presented a solution based on Coherence, their data grid that addresses the capacity challenge by spreading trades across servers and the availability challenge through the resilience of continuous replication that Coherence supports. As part of the presentation, the Coherence based solution touched upon one important paradigm, which can be adopted even outside grid based architectures to improve system performance and throughput.

Asynchronous processing ..

As the Oracle presentation rightly pointed out, one of the recurring problems in today's investment management solutions is the time pressure that the back-office faces dealing with regular overnight batch programs (like reconciliations, confirmations etc.) and struggling to complete them before the next day trading starts. Reconciliation is a typical example which needs to be executed at various levels between entities like traders, brokers, banks and custodians and with varying periodicities. The main bottleneck is these processes are executed as monolithic batch programs that operate on monstrous end-of-day databases containing millions of records that need to be processed, joined, validated and finally aggregated for reconciliation.

Enter Event based Reconciliation ..

This solution is not about grids, it is about poor man's asynchronous event based processing that transforms the end-of-day batch job into incremental MapReduce style progressions that move forward with every event, do real time processing and raise alerts and events for investment managers to respond to and take more informed decisions. One of the main goals of reconciliation being risk mitigation, I suppose such real time progressions will result in better risk handling and management. Also you save on time pressures that mandatory completion of today's batch processes imply, along with a more predictable and uniform load distribution across your array of servers.

And this proposed solution uses commodity tools and frameworks, good old proven techniques of asynchronous event processing, and never claims to be as scalable as the grid based one proposed by Coherence.

Think asynchronous, think events, think abstraction of transport, think selectors, think message queues, think managed thread-pools and thousands of open sockets and connections. After all you need to have a managed infrastructure that will be able to process your jobs incrementally based on events.

Asynchronous IO abstractions, MINA and Scala actors ..

Instead of one batch process for every reconciliation, we can think of handling reconciliation events. For example, loading STREET trades is an event that can trigger reconciliation with HOUSE trades. Or receipt of Trade Confirmations is an event for reconciliation with our placed Orders. We can set this up nicely using generic socket based end-points that listen for events .. and using event dispatch callbacks that do all the necessary processing. MINA provides nice abstractions for registering sockets, managing IoSessions, handling IO events that happen on your registered sockets and provides all the necessary glue to handle protocol encoding and decoding.

Here is a brief thought sequence .. (click to enlarge)

We can use MINA's asynchronous I/O service that abstracts the underlying transport's connection.

// set up the thread pool
executor = Executors.newCachedThreadPool()

// set up the socket acceptor with the thread pool
acceptor = new NioSocketAcceptor(executor, new NioProcessor(executor))

//.. set up other MINA stuff

Using MINA we can decouple protocol encoding / decoding from the I/O, and have separate abstractions for codec construction. Note how the responsibilities are nicely separated between the I/O session handling, protocol filters and event handling.

// add protocol encoder / decoder
  new ProtocolCodecFilter(//.., //..))

Now we need to register an IoHandler, which will handle the events and call the various callbacks like messageReceived(), sessionClosed() etc. Here we would like to be more abstract so that we do not have to handle all complexities of thread and lock management ourselves. We can delegate the event handling to Scala actors, which again can optimize on thread usage and help make the model scale.

// set the IoHandler that delegates event handling to the underlying actor
  new IoHandlerActorAdapter(session => new ReconHandler(session, ...)))

// bind and listen
acceptor.bind(new InetSocketAddress(address, port))

So, we have a socket endpoint where clients can push messages which result in MINA events that get routed through the IoHandler implementation, IoHandlerActorAdapter and translated to messages which our Scala actor ReconHandler can react to.

The class IoHandlerActorAdapter is adopted from the naggati DSL for protocol decoding from Twitter ..

class IoHandlerActorAdapter(val actorFactory: (IoSession) => Actor) 
  extends IoHandler {

  //.. callback
  def messageReceived(session: IoSession, message: AnyRef) = 
    send(session, MinaMessage.MessageReceived(message))

  // send a message to the actor associated with this session
  def send(session: IoSession, message: MinaMessage) = {
    val info = IoHandlerActorAdapter.sessionInfo(session)
    for (actor <-; if info.filter contains MinaMessage.classOfObj(message)) {
      actor ! message

and the class ReconHandler is the actor that handles reconciliation messages ..

class ReconHandler(val session: IoSession, ...) 
  extends Actor {


  def act = {
    loop {
      react {
        case MinaMessage.MessageReceived(msg) =>
          // note we have registered a ProtocolCodecFilter
          // hence we get msg as an instance of specific
          // reconciliation message

        case MinaMessage.ExceptionCaught(cause) => 
          //.. handle

        case MinaMessage.SessionClosed =>
          //.. handle

        case MinaMessage.SessionIdle(status) =>
          //.. handle

  private def doRecon(request: ReconRequest) = {
    request.header match {
      case "TRADE_CONF_ORDER" => 
        //.. handle reconciliation of confirmation with order

      case "TRADE_STREET_HOME" =>
        //.. handle reconciliation of stree side with home trades


Note that the above strategy relies on incremental progress. It may not be the case that the entire process gets done upfront. We may have to wait till the closing of the trading hours until we receive the last batch of trades or position information from upstream components. But the difference with the big bang batch process is that, by that time we have progressed quite a bit and possibly have raised some alerts as well, which would not have been possible in the earlier strategy of execution. Another way to view it is as an implementation of MapReduce that gets processed incrementally throughout the day on a real time basis and comes up with the eventual result much earlier than a scheduled batch process.


Sergio Bossa said...

Great post!

Have you considered implementing such a "reconciliation service" as a REST-style XML/JSON/ProtoBufs-over-HTTP server, rather than as a generic socket-based server?

jherber said...

Debasish, where do you feel async io protocol endpoints + scala actors + terracotta has advantages over ESB platforms? Developer closer to metal, yet abstraction levels comparable? Lower latency, higher performance (less transforms)? Smaller footprint of code and supporting infrastructure?

Unknown said...

@jherber: I am a bit scared about anything that has "enterprise" as the prefix. It's too much of an upfront payload irrespective of how much u need or use. I am more of a pay-as-you-need guy :).

Unknown said...

@Sergio: Nope, haven't tried it yet .. will be an interesting exercise with lots of asynchronous interactions. Will be interested to know more of your thoughts on such an implementation.