Monday, June 23, 2008

Playing around with parallel maps in Scala

Hacking around with various programming languages is fun. Lots of people have been advocating polyglotism for quite some time now. And I am a firm believer in this mantra (heck! isn't there a programming language by the same name ? Yeah! and it runs on the JVM too). Irrespective of whether you use it in your next project or not, learning a new language always gives you yet another orthogonal line of thinking for the problem at hand. But I digress.

Over the last couple of days I have been hacking around with a few implementations of a parallel map in Scala. The one suggested in Scala By Example is based on actor per thread model and for obvious reasons does not scale when your system cranks out its ability to spawn new threads. Scala's event based actors are designed such that the receive is not modeled as a wait on a blocked thread, but as a closure that captures the rest of the computation of the actor. Anyway, the basic premise is that we get lots of scalability and a feeling of an implementation of threadless concurrency on the JVM. However, in reality, Scala implements a unified model of actors, which get executed through a scheduler and a bunch of worker threads at a much more granular level of concurrency than the thread-per-actor model.

Here is an implementation designed in the line of Joe Armstrong's pmap that delivered a near linear speedup for 7 CPUs ..

class Mapper[A, B](l: List[A], f: A => B) {  def pmap = {    gather(      for(idx <- (0 until l.length).toList) yield {        scala.actors.Futures.future {          f(l(idx))        }      }    )  }  def gather(mappers: List[Future[B]]): List[B] = {    def gatherA(mappers: List[Future[B]], result: List[B]): List[B] = mappers match {      case m :: ms => gatherA(ms, m() :: result)      case Nil => result    }    gatherA(mappers, Nil).reverse  }}

A purely functional implementation. The gather() method has been made tail recursive, which should be optimized away in Scala. We all know that functions passed on to map are not supposed to have any side-effects - right ? Hence we can parallelize the invocations across all members of the list. And the above implementation does that. Also, gather() should return a new list which has the same order as the original list. The above implementation does that as well.

Can I improve the above implementation ? Well, in what way ? It is functional, and hence composable, side-effect-free and enjoys the implicit parallelism offered by the pure functional paradigm.

But how much pure is too pure and how much pure is pragmatic enough ?

In the above implementation, the construction of the target list is done sequentially by gathering all results in the order of the original list. And the gathering process starts after all of the generation processes have been initiated. Ideally we can get around this bottleneck through a predictive interleaving of the target element generation and the preparing of the target list processes. Introduce some impurity in the form of a mutable array that can be concurrently updated while the functions are being invoked on the elements of the original list ..

class Mapper[A, B](l: List[A], f: A => B) {  def pmap = {    val buffer = new Array[B](l.length)    val mappers =      for(idx <- (0 until l.length).toList) yield {        scala.actors.Futures.future {          buffer(idx) = f(l(idx))      }    }    for(mapper <- mappers) mapper()    buffer  }}

With multicores, the latter implementation should perform more efficiently as a drop-in replacement for Scala's List.map function.

Daniel said...

Interesting. I was considering doing something like this myself, but never got around to it. :-)

The only problems I see with your implementation is that a) it only accepts List, not Iterable (which isn't *so* much of a problem), and b) it returns an Array, not a List. This is a bit of a stickier issue I think, since the List#map method *by definition* must return a List. A better approach would be to use a mutable ListBuffer instead of an Array, then use the toList method.

Debasish said...

@Daniel :
Regarding returning a List, I can also do an Array.toList, since Array gets toList from Seq. Hence ..

def pmap: List[B] = {
//.. as before
buffer.toList
}

Daniel said...

Very true. However, IIRC Array#toList has linear complexity, whereas ListBuffer#toList is constant (because it actually uses a List under the surface).

Debasish said...

@Daniel :
Using ListBuffer, the parallel assignment (that I do with buffer(idx) = f(l(idx))) will not work. ListBuffer.+= appends an element in constant time, but will not support concurrent order preserving append. And
ListBuffer.update(int) expects elements to be present at the index, plus it takes linear time.

Anonymous said...

Would it solve the API problem, when taking an Iterable as parameter and then returning an Iterable too?
Both, List and Array are iterable, so there is nothing to change, but the methods signature

Jorge Ortiz said...
This comment has been removed by the author.
Jorge Ortiz said...

Check out the Scala Map Reduce (SMR) open source project

http://github.com/dlwh/smr/

Debasish said...

@jorge:

Inv said...

Hi, I tested your second implementation like this:

val l = Array.range(0, 40000);
val squares = pmap[Int, Int](l, x => x*x);
for (i <- squares.indices if i % 1000 == 0)
println(i + " " + squares(i))

and it only uses one core of my CPU. Isn't that weird? I'm running it from Eclipse. Cheers

Inv said...

Ok, so from scala console it uses both cores of the CPU for a while (100%) and then it's only one core (50%) for a few seconds.
Anyway, when I double the size of the input, the time grows significantly more that twice, so it seems it doesn't scale linearly. Am I the only one seeing this? Thanks!

val l = List.range(0, 20000);
val m = new Mapper[Int, Double](l, x => Math.sin(1 - Math.sin(x+5)*Math.cos(x-2)));
val squares = m.pmap();

for (i <- squares.indices if i % 1000 == 0)
println(i + " " + squares(i))