RedisClientPool
you can do some cool stuff in non blocking mode and get an improved throughput for your application. Suppose you have a bunch of operations that you can theoretically execute in parallel, maybe a few disjoint list operations and a few operations on key/values .. like the following snippets ..
val clients = new RedisClientPool("localhost", 6379) // left push to a list def lp(msgs: List[String]) = { clients.withClient {client => { msgs.foreach(client.lpush("list-l", _)) client.llen("list-l") }} } // right push to another list def rp(msgs: List[String]) = { clients.withClient {client => { msgs.foreach(client.rpush("list-r", _)) client.llen("list-r") }} } // key/set operations def set(msgs: List[String]) = { clients.withClient {client => { var i = 0 msgs.foreach { v => client.set("key-%d".format(i), v) i += 1 } Some(1000) // some dummy }} }
Redis, being single threaded, you can use client pooling to allocate multiple clients and fork these operations concurrently .. Here's a snippet that does these operations asynchronously using Scala futures ..
// generate some arbitrary values val l = (0 until 5000).map(_.toString).toList // prepare the list of functions to invoke val fns = List[List[String] => Option[Int]](lp, rp, set) // schedule the futures val tasks = fns map (fn => scala.actors.Futures.future { fn(l) }) // wait for results val results = tasks map (future => future.apply())
And while we are on this topic of using futures for non blocking redis operations, Twitter has a cool library finagle that offers lots of cool composition stuff on Futures and other non blocking RPC mechanisms. Over the weekend I used some of them to implement scatter/gather algorithms over Redis. I am not going into the details of what I did, but here's a sample dummy example of stuffs you can do with
RedisConnectionPool
and Future
implementation of Finagle ..The essential idea is to be able to compose futures and write non blocking code all the way down. This is made possible through monadic non-blocking
map
and flatMap
operations and a host of other utility functions that use them. Here's an example ..def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] = { //..
It uses
flatMap
and map
to collect the results from the given list of futures into a new future of Seq[A]
.Let's have a look at a specific example where we push a number of elements into 100 lists concurrently using a pool of futures, backed by
ExecutorService
. This is the scatter phase of the algorithm. The function listPush
actually does the push using a RedisConnectionPool
and each of these operations is done within a Future
. FuturePool
gives you a Future
where you can specify timeouts and exception handlers using Scala closures. Note how we use the combinator
collect
for concurrent composition of the futures. The resulting future that collect
returns will be complete when all the underlying futures have completed.After the scatter phase we prepare for the gather phase by pipelining the future computation using
flatMap
. Unlike collect
, flatMap
is a combinator for sequential composition. In the following snippet, once allPushes
completes, the result pipelines into the following closure that generates another Future
. The whole operation completes only when we have both the futures completed. Or we have an exception in either of them.For more details on how to use these combinators on
Future
abstractions, have a look at the tutorial that the Twitter guys published recently.implicit val timer = new JavaTimer // set up Executors val futures = FuturePool(Executors.newFixedThreadPool(8)) // abstracting the flow with future private[this] def flow[A](noOfRecipients: Int, opsPerClient: Int, fn: (Int, String) => A) = { val fs = (1 to noOfRecipients) map {i => futures { fn(opsPerClient, "list_" + i) }.within(40.seconds) handle { case _: TimeoutException => null.asInstanceOf[A] } } Future.collect(fs) } // scatter across clients and gather them to do a sum def scatterGatherWithList(opsPerClient: Int)(implicit clients: RedisClientPool) = { // scatter val allPushes: Future[Seq[String]] = flow(100, opsPerClient, listPush) val allSum = allPushes flatMap {result => // gather val allPops: Future[Seq[Long]] = flow(100, opsPerClient, listPop) allPops map {members => members.sum} } allSum.apply }
For the complete example implementations of these patterns like scatter/gather using Redis, have a look at the github repo for scala-redis.