This is very cool.
(Thanks to David Nolen, who pointed out errors in the original.)
I hadn't realized that the standard higher order sequence functions compose in a manner
that requires quite a bit of run-time machinery. For example nested map
calls will cause
a separate lazy sequence to be instantiated at each level of nesting, with each level
"pulling" results from the next innermost as needed. The function calls are thus temporally
interleaved, but the functions are not actually composed. We can see this happening by
adding some println
s to some unary functions and then running a nested map
over vector
data. Since vectors are chunked in 32-element blocks, the function calls will be
interleaved in 32-element blocks as well (or for less than 32 elements, not interleaved
at all):
user> (defn princ [i] (do (println "incrementing " i) (inc i)))
user> (defn prud [i] (do (println "doubling " i)) (* 2 i))
user> (def foo (map princ [1 2 3 4]))
#'user/foo
user> (take 2 foo)
(incrementing 1
incrementing 2
incrementing 3
incrementing 4
2 3)
user> (def foo (map princ (map prud [1 2 3 4])))
#'user/foo
user> (take 2 foo)
(doubling 1
doubling 2
doubling 3
doubling 4
incrementing 2
incrementing 4
incrementing 6
incrementing 8
3 5)
user>
If you increase the range to more than 32, you'll see blocks of 32 interleaved (try it yourself or trust me).
Reducers actually do what you want:
user> (def foo (r/map princ (r/map prud [1 2 3 4])))
#'user/foo
user> (reduce str (r/take 2 foo))
doubling 1
incrementing 2
doubling 2
incrementing 4
doubling 3
incrementing 6
"35"
user>
The functions in standard scala are even worse, actually instantiating non-lazy collection at every step.
scala> (1 to 5).map(x => {println("doubling "+x); 2*x}).map(x=>{println("incrementing "+x);x+1}).take(2)
doubling 1
doubling 2
doubling 3
doubling 4
doubling 5
incrementing 2
incrementing 4
incrementing 6
incrementing 8
incrementing 10
res17: scala.collection.immutable.IndexedSeq[Int] = Vector(3, 5)
If you increase the range to something large, you'll still see doubling and incrementing occuring in separate blocks.
Since 2.8, scala has provided an easy way to create lazy "views," nested mapping over which produces the expected interleaving. Scala doesn't chunk any of its collections, so we can't actually distinguish yet between interleaving and composition. But wait.
scala> (1 to 5).view.map(x => {println("doubling "+x); 2*x}).map(x=>{println("incrementing "+x);x+1}).take(2).force
doubling 1
incrementing 2
doubling 2
incrementing 4
res18: Seq[Int] = Vector(3, 5)
Since 2.9, scala has offered what they call parallel collections, which are created
from normal collections using the par
method, and over which map
at all can operate
in parallel over multiple threads. Rich Hickey intends parallelize these operations in clojure using the reducer framework
with its composed functions, but scala didn't do it this way:
par
allelizing a collection does cause it to be processed in parallel (as you
can see from the thread names), but sequential
operations are not composed and full intermediate sequences are built:
scala> def tgn = Thread.currentThread.getName
scala> val foo = (1 to 5).par.map(x => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(3)
ForkJoinPool-1-worker-1 doubling 1
ForkJoinPool-1-worker-7 doubling 3
ForkJoinPool-1-worker-1 doubling 2
ForkJoinPool-1-worker-5 doubling 4
ForkJoinPool-1-worker-3 doubling 5
ForkJoinPool-1-worker-7 incrementing 4
ForkJoinPool-1-worker-5 incrementing 8
ForkJoinPool-1-worker-7 incrementing 10
ForkJoinPool-1-worker-1 incrementing 2
ForkJoinPool-1-worker-3 incrementing 6
foo: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(3, 5, 7)
What's more, par
unview
ifies collections:
scala> (1 to 5).view
res15: scala.collection.SeqView[Int,scala.collection.immutable.IndexedSeq[Int]] = SeqView(...)
scala> (1 to 5).view.par
res16: scala.collection.parallel.ParSeq[Int] = ParArray(1, 2, 3, 4, 5)
So there seems to be no way to have parallelism with any semblance of laziness, let alone composition.
scala> val foo = (1 to 5).view.par.map(x => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(3)
ForkJoinPool-1-worker-7 doubling 1
ForkJoinPool-1-worker-5 doubling 3
ForkJoinPool-1-worker-1 doubling 2
ForkJoinPool-1-worker-5 doubling 5
ForkJoinPool-1-worker-3 doubling 4
ForkJoinPool-1-worker-5 incrementing 6
ForkJoinPool-1-worker-1 incrementing 8
ForkJoinPool-1-worker-3 incrementing 4
ForkJoinPool-1-worker-7 incrementing 2
ForkJoinPool-1-worker-1 incrementing 10
foo: scala.collection.parallel.ParSeq[Int] = ParArray(3, 5, 7)
A month ago,
before learning about view
s, I tried to roll my own lazily composeable
collections with something like the following:
class SeqF[A] (l : Seq[()=>A]) {
def map[B](f : A =>B) = new SeqF(l.map( x => () => f(x())))
def take(n : Int) = new SeqF(l.take(n))
def zipWithIndex = new SeqF( l.zipWithIndex.map( x => () => (x._1(),x._2) ) )
def scanLeft[B](b0 : B)(f : (B,A)=>B) : SeqF[B] = {
val l2 = l.scanLeft( ()=>b0 )( (b,a) => () => f(b(),a()) )
new SeqF[B](l2)
}
// (Define the rest of the seq operations here!)
def values = l.map(_()) // execute the composed functions
def parvalues = l.par.map(_()) // do so in parallel
}
object SeqF {
class SeqFMaker[A](l : Seq[A]) {
val it = l.iterator
def wrapf() = new SeqF(l.map( x => () => x) )
}
implicit def SeqSeqFMaker[A](l : Seq[A]) = new SeqFMaker[A](l)
}
import SeqF._
A SeqF
holds a vector of functions of Unit
, and the various higher order methods
create new SeqF
s whose functions compose with the originals.
(The F is for functor, of which this is an example, though containing values in
a closure is conceptually more complicated than doing so with a direct reference.)
There's a pimpy companion
object that enables the wrap
method to create a SeqF
, which has a values
method
to execute all the functions and produce a normal collection. I can add a new method,
parvalues
, which converts the vector of closures into a parallel collection before
execution. And it works!
scala> val foo = (1 to 100).toVector.wrapf.map(x => {println(tgn+" doubling "+x); 2*x}).map(x=>{println(tgn+" incrementing "+x);x+1}).take(5).parvalues
ForkJoinPool-1-worker-3 doubling 1
ForkJoinPool-1-worker-3 incrementing 2
ForkJoinPool-1-worker-7 doubling 2
ForkJoinPool-1-worker-7 incrementing 4
ForkJoinPool-1-worker-1 doubling 3
ForkJoinPool-1-worker-1 incrementing 6
ForkJoinPool-1-worker-7 doubling 4
ForkJoinPool-1-worker-5 doubling 5
ForkJoinPool-1-worker-5 incrementing 10
ForkJoinPool-1-worker-7 incrementing 8
foo: scala.collection.parallel.ParSeq[Int] = ParVector(3, 5, 7, 9, 11)
If the code doesn't convince you, there's additional evidence of composition from the threads in which each calculation is taking place. Note how the first element is processed completely in worker-3, the second and fourth in worker-7, the third in worker-1 and the 5th in worker 8; in all cases both doubling and incrementing take place in the same thread, which would be improbable if the functions hadn't been composed.
We can now enumerate several kinds of laziness when nesting maps:
- Intermediate collections are fully instantiated, and the unary functions are executed in blocks. This occurs with standard scala collections.
- Intermediate collections are instantiated as lazy sequences, so the unary functions will be interleaved (modulo any chunking optimizations in the specific collection). While technically lazy, the computer is actually hauling quite a lot of data between the intermediate sequences. It happens this way with scala views and standard clojure collections.
- The unary functions are composed into one, collapsing the nested operation into a single map, the output of which will be instantiated lazily, as needed. The clojure reducers package works this way.
- The unary functions are composed into one, collapsing the nested operation into a single map, from which a single instantiated vector of closures is created. My SeqF class operates in this fashion. This may be somewhat less lazy than 3, but maybe more amenable to parallelization.
Comments
comments powered by Disqus