This is very cool.

(Thanks to David Nolen, who pointed out errors in the original.)

I hadn't realized that the standard higher order sequence functions compose in a manner that requires quite a bit of run-time machinery. For example nested map calls will cause a separate lazy sequence to be instantiated at each level of nesting, with each level "pulling" results from the next innermost as needed. The function calls are thus temporally interleaved, but the functions are not actually composed. We can see this happening by adding some printlns to some unary functions and then running a nested map over vector data. Since vectors are chunked in 32-element blocks, the function calls will be interleaved in 32-element blocks as well (or for less than 32 elements, not interleaved at all):

user> (defn princ [i] (do (println "incrementing " i) (inc i)))
user> (defn prud [i] (do (println "doubling " i)) (* 2 i))
user> (def foo (map princ [1 2 3 4]))
user> (take 2 foo)
(incrementing  1
incrementing  2
incrementing  3
incrementing  4
2 3)
user> (def foo (map princ (map prud [1 2 3 4])))
user> (take 2 foo)
(doubling  1
doubling  2
doubling  3
doubling  4
incrementing  2
incrementing  4
incrementing  6
incrementing  8
3 5)

If you increase the range to more than 32, you'll see blocks of 32 interleaved (try it yourself or trust me).

Reducers actually do what you want:

user> (def foo (r/map princ (r/map prud [1 2 3 4])))
user> (reduce str (r/take 2 foo))
doubling  1
incrementing  2
doubling  2
incrementing  4
doubling  3
incrementing  6

The functions in standard scala are even worse, actually instantiating non-lazy collection at every step.

scala> (1 to 5).map(x => {println("doubling "+x); 2*x}).map(x=>{println("incrementing "+x);x+1}).take(2)
doubling 1
doubling 2
doubling 3
doubling 4
doubling 5
incrementing 2
incrementing 4
incrementing 6
incrementing 8
incrementing 10
res17: scala.collection.immutable.IndexedSeq[Int] = Vector(3, 5)

If you increase the range to something large, you'll still see doubling and incrementing occuring in separate blocks.

Since 2.8, scala has provided an easy way to create lazy "views," nested mapping over which produces the expected interleaving. Scala doesn't chunk any of its collections, so we can't actually distinguish yet between interleaving and composition. But wait.

scala> (1 to 5) => {println("doubling "+x); 2*x}).map(x=>{println("incrementing "+x);x+1}).take(2).force
doubling 1
incrementing 2
doubling 2
incrementing 4
res18: Seq[Int] = Vector(3, 5)

Since 2.9, scala has offered what they call parallel collections, which are created from normal collections using the par method, and over which map at all can operate in parallel over multiple threads. Rich Hickey intends parallelize these operations in clojure using the reducer framework with its composed functions, but scala didn't do it this way: parallelizing a collection does cause it to be processed in parallel (as you can see from the thread names), but sequential operations are not composed and full intermediate sequences are built:

scala> def tgn = Thread.currentThread.getName
scala> val foo = (1 to 5) => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(3)
ForkJoinPool-1-worker-1 doubling 1
ForkJoinPool-1-worker-7 doubling 3
ForkJoinPool-1-worker-1 doubling 2
ForkJoinPool-1-worker-5 doubling 4
ForkJoinPool-1-worker-3 doubling 5
ForkJoinPool-1-worker-7 incrementing 4
ForkJoinPool-1-worker-5 incrementing 8
ForkJoinPool-1-worker-7 incrementing 10
ForkJoinPool-1-worker-1 incrementing 2
ForkJoinPool-1-worker-3 incrementing 6
foo: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(3, 5, 7)

What's more, par unviewifies collections:

scala> (1 to 5).view
res15: scala.collection.SeqView[Int,scala.collection.immutable.IndexedSeq[Int]] = SeqView(...)
scala> (1 to 5).view.par
res16: scala.collection.parallel.ParSeq[Int] = ParArray(1, 2, 3, 4, 5)

So there seems to be no way to have parallelism with any semblance of laziness, let alone composition.

scala> val foo = (1 to 5) => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(3)
ForkJoinPool-1-worker-7 doubling 1
ForkJoinPool-1-worker-5 doubling 3
ForkJoinPool-1-worker-1 doubling 2
ForkJoinPool-1-worker-5 doubling 5
ForkJoinPool-1-worker-3 doubling 4
ForkJoinPool-1-worker-5 incrementing 6
ForkJoinPool-1-worker-1 incrementing 8
ForkJoinPool-1-worker-3 incrementing 4
ForkJoinPool-1-worker-7 incrementing 2
ForkJoinPool-1-worker-1 incrementing 10
foo: scala.collection.parallel.ParSeq[Int] = ParArray(3, 5, 7)

A month ago, before learning about views, I tried to roll my own lazily composeable collections with something like the following:

class SeqF[A] (l : Seq[()=>A]) {
  def map[B](f : A =>B) = new SeqF( x => () => f(x())))
  def take(n : Int) = new SeqF(l.take(n))
  def zipWithIndex = new SeqF( x => () => (x._1(),x._2) ) )
  def scanLeft[B](b0 : B)(f : (B,A)=>B) : SeqF[B] = {
    val l2 = l.scanLeft( ()=>b0 )( (b,a) => () => f(b(),a()) )
    new SeqF[B](l2)
  // (Define the rest of the seq operations here!)
   def values =       // execute the composed functions
   def parvalues =  // do so in parallel
object SeqF {
  class SeqFMaker[A](l : Seq[A]) {
    val it = l.iterator
    def wrapf() = new SeqF( x => () => x) )
  implicit def SeqSeqFMaker[A](l : Seq[A]) = new SeqFMaker[A](l)
import SeqF._

A SeqF holds a vector of functions of Unit, and the various higher order methods create new SeqFs whose functions compose with the originals. (The F is for functor, of which this is an example, though containing values in a closure is conceptually more complicated than doing so with a direct reference.) There's a pimpy companion object that enables the wrap method to create a SeqF, which has a values method to execute all the functions and produce a normal collection. I can add a new method, parvalues, which converts the vector of closures into a parallel collection before execution. And it works!

scala> val foo = (1 to 100) => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(5).parvalues
ForkJoinPool-1-worker-3 doubling 1
ForkJoinPool-1-worker-3 incrementing 2
ForkJoinPool-1-worker-7 doubling 2
ForkJoinPool-1-worker-7 incrementing 4
ForkJoinPool-1-worker-1 doubling 3
ForkJoinPool-1-worker-1 incrementing 6
ForkJoinPool-1-worker-7 doubling 4
ForkJoinPool-1-worker-5 doubling 5
ForkJoinPool-1-worker-5 incrementing 10
ForkJoinPool-1-worker-7 incrementing 8
foo: scala.collection.parallel.ParSeq[Int] = ParVector(3, 5, 7, 9, 11)

If the code doesn't convince you, there's additional evidence of composition from the threads in which each calculation is taking place. Note how the first element is processed completely in worker-3, the second and fourth in worker-7, the third in worker-1 and the 5th in worker 8; in all cases both doubling and incrementing take place in the same thread, which would be improbable if the functions hadn't been composed.

We can now enumerate several kinds of laziness when nesting maps:

  1. Intermediate collections are fully instantiated, and the unary functions are executed in blocks. This occurs with standard scala collections.
  2. Intermediate collections are instantiated as lazy sequences, so the unary functions will be interleaved (modulo any chunking optimizations in the specific collection). While technically lazy, the computer is actually hauling quite a lot of data between the intermediate sequences. It happens this way with scala views and standard clojure collections.
  3. The unary functions are composed into one, collapsing the nested operation into a single map, the output of which will be instantiated lazily, as needed. The clojure reducers package works this way.
  4. The unary functions are composed into one, collapsing the nested operation into a single map, from which a single instantiated vector of closures is created. My SeqF class operates in this fashion. This may be somewhat less lazy than 3, but maybe more amenable to parallelization.


comments powered by Disqus