Parallel Lines

Select, few

Recently, there's been a lot of interest in DSLs to address the so-called N+1 selects problem, which describes the very common situation where you

  1. Do one database query to get a list of things, say user IDs;
  2. for each, call some sort of processing function that happens to
  3. do another query, say for each user's name.

It can be even worse than that, as in this example, where (for some reason) we want to append the last name of every person in a group to the first name of their best friend. (Maybe they're getting married.)

(defn mangleNamesFromGroup [gid]
  (join ", "   (map  #(str (getFirstName (getBff %)) " " (getLastName %))
                     (getUserIds gid))))

To the database, this looks like a call to select ids in the group, followed by interleaved calls to getBff, getFirstName, getLastName. So it's sort of the 3N+1 selects problem.

Code like this can be easily optimized in different ways, depending on the capabilities of the database. E.g., maybe there are efficient plural versions of all the get functions:

(defn mangleNamesFromGroup [gid]
  (let [ids (getUserIds gid)
        lns (getLastNames ids)
        bfs (getBffs ids)
        fns (getFirstNames bfs)]
    (join "," (map #(str %1 " " %2) ids lns))))

Or, if the database supports joins directly, we could write a special bespoke function to do the whole thing for us.

It's also possible that it's ok to continue doing repeated queries for single values, as long as similar queries are performed at approximately the same time2. We still have to rewrite the code so the queries aren't alternating and blocked on each other:

(defn mangleNamesFromGroup [gid]
  (let [ids (getUserIds gid)
        lns (map getLastName ids)
        bfs (map getBff ids)
        fns (map getFirstName bfs)]
    (join "," (map #(str %1 " " %2) fns lns))))

Now, a wave of getLastNames, followed by getBffs, and then the getFirstNames go out, and assuming that some facility is grouping each batch into a single query, we now have a 3+1=4 problem.

This is not awful, but... no actually it is. Why should we have to contort our code around the best practices of whatever database we happen to be using? Why should we have to refactor our code just because we switch to a new database that wants the batching to be done slightly differently? (And you also might wonder why we wait to do the getBffs until the getLastNames are done.)

Don't you wish you could just sprinkle some pixie dust on the code we started with, and it would somehow all be ok?

Haskell programmers, delicate dreamers that they are, prominently bemoaned the lack of pixie dust. And then they made some.

Haxl arose

In 2014, Simon Marlow (of Parallel and Concurrent Programming in Haskell fame) and collaborators at Facebook published There is no Fork: an Abstraction for Efficient, Concurrent, and Concise Data Access. (There's also a nice talk, and Facebook even open-sourced the code.)

There's a Haxl-inspired package for clojure, and at least three for Scala, though the last of these, Stitch, is a private, internal project at Twitter.

At the risk of over-simplifying, I'm going to divide their idea into two pieces and largely talk about only one of them.

Piece I

One of the pieces is about how to cause code written using standard Haskell language features and library abstractions (do notation, applicative functors - plus a little textual code transformation) to induce some kind of batched I/O behavior. For example, using their Fetch type might look like

  runHaxl env $ do
   ids <- getIds grp
   mapM getBffName
  where
   getBffName id = do
       bff <- getBff id
       fn <- getFirstName bff
       ln <- getLastName id
     fn ++ " " ++ ln

and the scala implementations are along the lines of

    getIds(grp).traverse {id =>
       for(bff <- getBff(id)
           fn  <- getFirstName(bff)
           ln  <- getLastName id)
          yield s"$fn $ln"
     }

(These might be not exactly right, but since I didn't tag this as a Haskell or Scala post, that's ok, yes?) There was, of course, concurrency before Haxl, but, they note

All these previous formulations of concurrency monads used some kind of fork operation to explicitly indicate when to create a new thread of control. In contrast, in this paper there is no fork. The concurrency will be implicit in the structure of the computations we write.

They achieve this by introducing an applicative functor:

This is the key piece of our design: when computations in Fetch are composed using the <*> operator, both arguments of <*> can be explored to search for Blocked computations, which creates the possibility that a computation may be blocked on multiple things simultaneously. This is in contrast to the monadic bind operator, >>=, which does not admit exploration of both arguments, because the right hand side cannot be evaluated without the result from the left.

This means that there's actually slightly more than the usual do magic going on In this case, the getBff and getLastName ought to be completely independent, so they actually preprocess the second do into

     do
       (bff,ln) <- (,) $ (getBff id) <*>  (getLastName id)
       fn <- getFirstName bff
     fn ++ " " ++ ln

using a compiler extension.

There is much handwringing over the need to use an applicative implementation that differs from the default ap defined for monads. Whether or not the controversy interests you, it's not of much significance in Clojure, where multi-argument map-like functions abound, and typological conundra do not keep us up at night.

Moreover, I will argue that the need to use applicatives in the first place is somewhat artificial. Once we admit that we're willing to do some sort of dependency-aware code transformation (as with do), we can avoid applicatives by doing just a little more of it.

Piece II

The language agnostic piece is that code that looks like it's doing normal stuff like looping and fetching is actually constructing a dependency graph. Instead of returning results directly, functions return a mutable Result object that contains either

  1. an actual result value,
  2. a list of other Results that we're waiting on
  3. a description of a query that could fulfill the Result.

The actual query functions are memoized to return a Result object that starts out as a Result(query="whatever"), but at some point in the future can have a value poked into it.

Non-query function calls are jiggered so that, if any of their arguments is a non-value Result, they will themselves return a Result(waiting=...), listing those Results, on which they depend. Eventually, we end up with a tree of nested Results, the leaves of which are queries to fulfill. We scoop up all the leaves, figure out how to fulfill the queries and cram the results back into their Result object, and repeat the program from the beginning.

On the second run, we get a new batch of leaf queries to fulfill, and we repeat until there are no blocks, and a result can be computed.

The really important thing here is that, once we write our program using Haxl, the "scoop" and "figure" work is somewhat decomplected from program logic. I say somewhat, because in writing with the Haxl DSL in the first place, we've already made concessions to the fact that batching is in some way important.

Running mangleNames under a Haxl-like system

It's interesting to trace through our example program, noting what Result trees might be produced in each pass. Actually, the trees I'm noting were produced by a quick-and-dirty Clojure implementation that will be explained in the next section, which you might want to skip to if you prefer to read Clojure code than to imagine it. (Or if you'll be annoyed when you discover that the next second is really a superset of this one.)

Our haxlified mangleNames on the first pass might return something like,

          Result[:waiting (Result[:query ("select id from groups where gid=" 37)])]

indicating that we're blocked on a result that's blocked on our first query.

Assume that our system knows how to run the query, the results get shoved back into the Result, giving us a tree that looks like this:

         Result[:waiting [Result[:value [321 123]]]]

Now we run the program again, and the memoized groups query returns the now fulfilled Result[:value [321 123] object, so we block a little further down. The new tree blocked on a different group of queries:

   Result[:waiting (
     Result[:waiting (
        Result[:waiting (
           Result[:waiting (
              Result[:query ("select bff from stuff where id=" 123)])]
              Result[:query ("select lastName from stuff where id=" 123)])]
        Result[:waiting (
            Result[:waiting (
               Result[:query ("select bff from stuff where id=" 321)])]
               Result[:query ("select lastName from stuff where id=" 321)])])])]

Now we scoop up these two similar queries, let our database combine them which whatever clever magic it prefers, and inject the answers into the Result objects so the tree looks like

         Result[:waiting (
             Result[:waiting (
                 Result[:waiting (
                    Result[:waiting (
                       Result[:value 777])]
                       Result[:value "Trump"])]
                  Result[:waiting (
                      Result[:waiting (
                         Result[:value 888])]
                         Result[:value "Putin"])])])]

and run again, now yielding a batch of similar queries for name

  Result[:waiting (
     Result[:waiting (
        Result[:waiting (
           Result[:query ("select firstName from stuff where id=" 777)])]
        Result[:waiting (
           Result[:query ("select firstName from stuff where id=" 888)])])])]]

whose results we stuff back into their Result holders:

    Result[:waiting (
       Result[:waiting (
          Result[:waiting (
             Result[:value "Mutt"])]
          Result[:waiting
             (Result[:value "Jeff"])])])]

And we run the program a fourth time, finally getting back

           "Mutt Trump, Jeff Putin"

A shoddy implementation in Clojure

It was instructive for me to hack this up in Clojure, even though there's already a a more professional implementation called Muse.

First, note that the Haskell Haxl Result has an IORef in the middle.

data Result a = Done a | Blocked (Seq BlockedRequest) (Fetch a)
data BlockedRequest = forall a . BlockedRequest (Request a) (IORef (FetchStatus a))
data FetchStatus a = NotFetched | FetchSuccess a

They don't actually avoid mutation altogether, and neither will we.

The possibly blocking object

(deftype Result [a])
(defmethod print-method Result [o, w] (.write w (str "Result" @(.a o))))

contains a volatile that will hold [:value final-result], [:query some-query] or [:waiting list-of-results]. I overrode print-method to make the output a little more presentable here. You probably wouldn't do that in real life.

A basic utility function tries to extract results from a potential Result, returning the extracted value if possible:

(defn try-get [r]
  (if-not (instance? Result r) r
          (match [@(.a r)]
                 [[:value v]] v
                 :else        r)))

The next most fundamental operation is to apply a function to a collection...

(defn try-coll [coll f]
  (let [coll (try-get coll)]

or what we hope is a collection, as it might itself be a Result...

    (if (instance? Result coll) coll

or contain unfulfilled Results...

        (let [coll (map try-get coll)
              bs   (filter #(instance? Result %) coll)]

in which case, we note our dependency on them,

          (if (seq bs) (->Result (volatile! [:waiting bs]))

but if the collection is complete, we actually evaluate the function,

              (try-get (f coll)))))))

Our queries are just placeholders:

(def query (memoize (fn [& q] (->Result (volatile! [:query q])))))

And we write some helper methods to invoke functions whose arguments might be Results or which might return them:

(defn block-apply [f & args] (try-coll args #(apply f %)))
(defn block-map [f coll] (try-coll (try-coll coll #(map f %)) identity))

For demo purposes, it will be useful to extract all the query leaf nodes in a tree:

(defn reap [b]
  (if-not (instance? Result b) []
          (match [@(.a b)]
                 [[:query _]] [b]
                 [[:waiting rs]] (mapcat reap rs))))

Testing out the shoddy implementation in Clojure

As promised, this is going to look just like the pretend example in the second before last.

We define some query functions,

(defn getUserIds [gid] (query "select id from groups where gid=" gid))
(defn getFirstName [id] (query "select firstName from stuff where id=" id))
(defn getLastName [id] (query "select lastName from stuff where id=" id))
(defn getBff [id] (query "select bff from stuff where id=" id))

in terms of which we write the program, using our block- helpers to indicate that the map or function application might be blocking:

(defn mangleNamesFromGroup [gid]
   (block-apply join ", "
               (block-map 
                #(block-apply str  (block-apply getFirstName (getBff %)) " "
                              (getLastName %))
                (getUserIds gid))))

If we were doing the queries sequentially, there we would be alternating calls to getBff and getFirstName, which would be difficult to optimize. Let's see what our haxly conniving gets us:

repl> (def x1 (mangleNamesFromGroup 37)) x1

As expected, we're blocked on the first query, which we pretend returns [321 123]:

repl> (defn fulfill [r v] (vreset! (.a r) [:value v]) r)
repl> (-> x1 reap first (fulfill [123 321]))
repl> x1
   Result[:waiting (Result[:value [123 321]])]

Now run the exact same function again,

repl> (def x2 (mangleNamesFromGroup 37))
repl> x2
   Result[:waiting (
     Result[:waiting (
        Result[:waiting (
           Result[:waiting (
              Result[:query ("select bff from stuff where id=" 123)])]
              Result[:query ("select lastName from stuff where id=" 123)])]
        Result[:waiting (
            Result[:waiting (
               Result[:query ("select bff from stuff where id=" 321)])]
               Result[:query ("select lastName from stuff where id=" 321)])])])]

This time we get all the queries depending on the user ids we have, and we fake fulfillment:

repl> (doall (map #(do (fulfill %1 %2)) (reap x2) [777 "Trump" 888 "Putin"]))
repl> x2
         Result[:waiting (
             Result[:waiting (
                 Result[:waiting (
                    Result[:waiting (
                       Result[:value 777])]
                       Result[:value "Trump"])]
                  Result[:waiting (
                      Result[:waiting (
                         Result[:value 888])]
                         Result[:value "Putin"])])])]

and again

repl> (def x3 (mangleNamesFromGroup 37))
repl> x3
  Result[:waiting (
     Result[:waiting (
        Result[:waiting (
           Result[:query ("select firstName from stuff where id=" 777)])]
        Result[:waiting (
           Result[:query ("select firstName from stuff where id=" 888)])])])]]
repl> (doall (map #(do (fulfill %1 %2)) y3 ["Mutt" "Jeff"]))
repl> x3
    Result[:waiting (
       Result[:waiting (
          Result[:waiting (
             Result[:value "Mutt"])]
          Result[:waiting
             (Result[:value "Jeff"])])])]

and again,

repl> (def x4 (mangleNamesFromGroup 37))
repl> x4
           "Mutt Trump, Jeff Putin"

Exactly what problem have we solved?

Or at least what problem would we have solved if we hadn't typed in the query responses manually?

Actually, I think we've solved three problems that are only incidentally related.

Problem 1: We've managed to batch similar queries together. That's nice, but, as noted before, the database might have done this anyway had we simply sent them at approximately the same time.

Problem 2: We've desequentialized individual, interleaved queries, so they can be written to reflect the logical use of their results rather than the preferred timing of execution.

Problem 3: We did this without explicitly forking.

If you're a web developer, you might think that this is like an elixir for gout, as a binge on blocking IO is a bit of a luxury to begin with. Had we been forced from the beginning to write non-blocking code - say, in javascript - the sequentiality problem would likely never have arisen.1 In fact, it might have required willful effort to force the queries to run sequentially and interleaved.

To prove the point, let's temporarily drop the "there is no fork" part of our adventure. We'll mock asynchronous queries as returning a core.async channel that prints to stdout so we can see what it's doing and then, after a short delay, just munges some strings:

(defn aquery [s i] (go (println (str s "(" i ") ")) (<! (timeout 1000)) (str s "-" i)))

The simplest async version of our program just launches the queries and dereferences them immediately, using async/map to turn a sequence of channels into a single channel:

(defn mangleNamesFromGroupAsync [gid]
  (a/<!! (a/map vector 
             (map #(go (str (<! (aquery "getName" (<! (aquery "getBff" %))))
                             " " (<! (aquery "getLastName" %))))
                    (a/<!! (aquery "getIds" gid)))))  )

With some minor reformatting:

repl> (mangleNamesFromGroupAsync 37)
getIds(37) 
    <pause>
getBff(123) getBff(321) 
    <pause>
getName(777) getName(888) 
    <pause>
getLastName(123) getLastName(321) 
    <pause>
["Mutt Trump" "Jeff Putin"]

that all the getBff queries were sent without waiting before the getName queries, but with a lot less fuss and mutation, so maybe we were in nirvana all along...

Not so fast

First, while it might be second nature to clojurescript (or whatever script) developers, this (a/map vector (map #(go (<! aquery ...) ...) ...) business is already a bit removed from plain old (map (aquery ..)). Without thinking about it too much, we complected our code beyond its functional intent to also express an opinion about how asynchronicity should be juggled.

Furthermore, it's still not as asynchronous as it should be. There's no reason for the getBffs and getLastNames to go out in different waves, but we introduced an artificial dependency in the order of arguments to str. To remove the dependency, we need to contort a bit further, explicitly launching as many of our queries as we can, before we need them:

(defn mangleNamesFromGroupAsync2 [gid]
  (a/<!! (a/map vector 
                (map #(go (let [c1 (go (<! (aquery "getName" (<! (aquery "getBff" %)))))
                                c2 (aquery "getLastName" %)]
                            (str (<! c1) " " (<! c2))))
                     (a/<!! (aquery "getIds" gid)))))  )

Now

repl> (mangleNamesFromGroupAsync2 37)
getIds(37) 
    <pause>
getLastName(123) getLastName(321) 
getBff(123) getBff(321) 
    <pause>
getName(777) getName(888) 
    <pause>
["Mutt Trump" "Jeff Putin"]

we have only two pauses.

So even with a certain amount of the magic taken care of by asynchronous constructs, we still have to shovel the code around to get it to behave. Not only have we not rid ourselves of fork, but we've made deploying it correctly a central part of the coding challenge.

A tree is a tree

I mean, if you've looked at a hundred thousand acres or so of trees — you know,
a tree is a tree, how many more do you need to look at?
    - Ronald Reagan

Our naive implementation

(defn mangleNamesFromGroup [gid]
  (join ", "   (map  #(str (getFirstName (getBff %)) " " (getLastName %))
                     (getUserIds gid))))

has an implicit tree structure, defined by its nested parentheses.

      (map   .                                         . )
             |                                         |
          #(str   .    " "           .  )           (getUserIds gid)
                 /                    \
              (getFirstName  . )     (getLastName %)
                             |
                         (getBff %)

Then, when we ran

(defn mangleNamesFromGroup [gid]
   (block-apply join ", "
               (block-map 
                #(block-apply str  (block-apply getFirstName (getBff %)) " "
                              (getLastName %))
                (getUserIds gid))))

we got a Result tree, whose edges were defined by references to child Results in the :waiting list. Admittedly, we didn't get it all at once, but in 3 successive passes:

            :waiting______________
             /                    \
Pass 1    :query                :waiting_______
          getIds                /              \
                         :waiting__          :waiting___
                         /      \  \          /      \  \
Pass 2               :query  :query \      :query :query \
                     getLN  getBff   \      getLN  getBff \
                                   :query                :query
Pass 3                              getFN                 getFN

After we translated

(defn mangleNamesFromGroupAsync [gid]
  (a/<!! (a/map vector 
             (map #(go (str (<! (aquery "getName" (<! (aquery "getBff" %))))
                             " " (<! (aquery "getLastName" %))))
                    (a/<!! (aquery "getIds" gid)))))  )

to

(defn mangleNamesFromGroupAsync2 [gid]
  (a/<!! (a/map vector 
                (map #(go (let [c1 (go (<! (aquery "getName" (<! (aquery "getBff" %)))))
                                c2 (aquery "getLastName" %)]
                            (str (<! c1) " " (<! c2))))
                     (a/<!! (aquery "getIds" gid)))))  )

we get an extremely similar tree, with edges defined by asynchronous waits. (The <n> below are supposed to indicate the nth invocation of the closure.)

                   map__________
                   /            \
                aquery      #(go <0> ........    <1> )
                getIds           /                 \
                               let______           let______
                               /   \    \          /   \    \
                           aquery aquery \     aquery aquery \
                           getLN  getBFF  \    getLN  getBFF  \
                                        aquery               aquery
                                        getLN                 getLN

In this case, there are no formal batches, but the vertical position of the node in the diagram indicates roughly when the query actually occurs. In practice, they may be jittered up and down a bit, and the order in which they are executed is no longer deterministic, but if we, like Haxl, assume that queries are referentially transparent, the programs will return identical results.

Another difference is, as noted previously, that, with no explicit batching, the core.async approach relies on someone else to recognize a run of similar queries, and if the someone lives on the server, this will cause increased network chatter. At the same time, the Haxl style has the ironic side effect of separating computation and querying into distinct, sequential phases, in the name of desequentializing the queries within each phase.

In both cases, similar functional expressions were converted into similar dependency trees, with queries occurring in similar temporal waves. And in both cases, the waves are ordered in the same way, with the deepest dependencies coming first, that is in topological sort order.

We infer that the mechanics of converting a functional expression into parallelized, asynchronous form (which so far, we've done manually) ought to be very similar to the creation of Result trees in a Haxl approach, and that both of them are essentially topological sorts.

That one can achieve batching and parallelism through asynchronous code transformation is not a novel observation. There's a 2014 paper from Ramachandra, Chavan, Guravannavar, and Sudarshan at ITT actually called Program Transformations for Asynchronous and Batched Query Submission. They present a set of formal transformation rules for general procedural programs, and refer to an implementation for Java in their DBridge optimizer. Additionally, about 17 minutes into Jake Donham's Stitch talk, he describes a possible batching strategy as "accumulating calls until you hit some threshold or something like that," which seems to imply that these calls are arriving asynchronously.

It's worth noting that, while the ITT optimizer allows you not just to write batching-oblivious code, but to write exactly the same code as you would if queries were instantaneous, this is not an essential feature of compile-time desequentializing. It's a convenience, but a less important step beyond the main goal of letting code express functional intent in a logical way.

(It may have occurred to the careful reader that I am not carefully distinguishing concurrency and parallelism. This is true but uninteresting.)

Gifts from the Gods

What is best in life - according to the original script of Conan the Barbarian - is to discover that there's some nominally difficult thing that might actually be easy, because you can slough off nearly all the work onto other people. Once we generalize "doing all our queries into batches" to "doing everything in parallel", we can rely on a few great tools in the Clojure tool-shed.

Gift I: Homoiconicity

The first set of tools comes from the fact that algorithms expressed in lisp are vastly easier to transform than in most languages. Haxl derivatives create a Result structure at runtime; DBridge parses java code into data structures to which transformation rules are then applied. Clojure programs are already a data structure, so we get to skip a step and transform the code directly.

The transformation, as noted, is really a topological sort. Given

   (foo form1 form2 ...)

we simply want

   (xform `(foo form1 form2 ...)) 

to become

   `(let [c1 (go ~(xform form1))
          c2 (go ~(xform form2))
          ...                   ]
      (foo (<! c1) (<! c2) ...)

It's clear how this parallelizes independent argument forms; we can also see that this depth-first recursive transformation has the effect of hoisting nested arguments earlier in the evaluation sequence. For a boring function of a function,

    (foo (bar x))

we get

    `(let [c1 (go ~(xform `(bar x)))]
       (foo (<! c1)))

which becomes

    `(let [c1 (go 
                  (let [c2 (go x)]
                    (bar (<! c2))))]
        (foo (<! c1)))

In building up the new structure, we traverse the call graph exactly once, so the whole procedure is O(N), where N is the total number of arguments of all functions in the program. It seems that the Haxl procedure requires re-generating the graph each time leaf nodes are extracted, making it O(N log N). This reflects the fact that the Haxl ordering is stronger than topological: not only is every query run before its result is required (which is of course a necessary ordering), no query is run before the previous batch completes (which is really not necessary).

Once you get past a few ritual incantations, it is almost trivial to produce a macro that does our transformation:

(defmacro parallelize-func-stupid [form]
  (let [[f & args] form                           ;; Extract function and arguments
        cs    (repeatedly (count args) gensym)    ;; Generate some channel names
        pargs (map parallelize-func-stupid args)  ;; Recursively transform the args
        bs    (interleave cs                      ;; Construct the binding block
                   (map (fn [parg] `(a/go ~parg)) pargs))
        args (map (fn [c] `(a/<! ~c)) cs)]         ;; Deref the channels
    `(let [~@bs] (~f ~@args))))

Presto!

Well, an incomplete presto. This will actually blow up for arguments that are not themselves function applications, which means it only works on infinitely sized programs. Also, we've been acting as if go returned a promise that can be dereferenced an arbitrary number of times, which of course it really doesn't. We'll do better below.

Gift II: JVM Fibers and Quasar

So far, all my parallelization examples have used core.async, which is certainly not a bad way to go, but it might be fun to look at an interesting development in the JVM world. Quasar is an implementation of fibers in Java, with a Clojure API called Pulsar.

A fiber presents itself as very much like a normal thread, but much cheaper to create. Like other inexpensive concurrency constructs, fibers are, under the hood, essentially co-routines - multiple streams of execution handing off control to each other cooperatively, so there may be vastly more such streams than the number of threads available (though of course the thread pool does limit the number of fibers that can be actively doing anything at a given time).

In core.async, cooperation is achieved by rewriting code into state-machine form, using macros. In Quasar, it's done by rewriting at the byte-code level, so that methods marked "suspendable" can give up control when waiting on each other. An advantage of the bytecode approach is that stack traces are preserved, making debugging easier, and fibers are said to be more efficient than channels when there are vast numbers of them.

One of the available Pulsar APIs is a drop-in replacement for core.async, but the requirement to wrap channel operations in a go block now feels like an artificial inconvenience. Additionally, what we need for haxilating is not really a CSP channel but a promise, and, while clojure.core.async implements promises on top of channels, the Pulsar promise is closer to the core fiber abstraction, and co.paralleluniverse.pulsar.async channels represent an additional layer of abstraction, so I preferred to use promises directly.

The basic element of a Pulsar program is the suspendable function, which you define with defsfn, e.g.

  (q/defsfn my-suspendable [x] (other-suspendable x))

It's not turtles all the way down: at some point, one of the suspendables will actually give up control while waiting for a callback from some external event, and Pulsar provides an await macro to facilitate that:

  (q/defsfn other-suspendable [x] @(fiber (q/await httpkit/request x)))

Here, the macro expands to call the workhorse httpkit/request with our argument x, plus a callback, and then faux blocks (meaning that the Quasar runtime parks us) until that callback is called. (Actually, Pulsar has already wrapped httpkit/kit in a defreq macro, from which this example is shamelessly copied.) One thing that's cool about these suspendable functions is that they can be used in normal code if you want, but you can identify them with suspendable?, which is going to be useful in determining exactly which forms will benefit from parallelization.

The function parallelization macro in Pulsar-land is pretty much the same as in core.async country, except that we create promises using (q/promise (fn [] form-to-fulfill-promise))):

(defmacro parallelize-func-stupid2 [form]
  (let [[f & args] form
        ps    (repeatedly (count args) gensym)
        pargs (map parallelize-func-stupid2 args)
        bs    (interleave ps (map (fn [parg] `(q/promise (fn [] ~parg))) pargs))
        args  (map (fn [p] `(deref ~p)) ps)]
    `(let [~@bs] (~f ~@args))))

QAXL - Quasar Haxl

Per its evocative suffix, the above parallelization functions are on dimwitted side. Fortunately, we mostly have to train it not to do certain things, like insisting on transforming everything whether or not it needs it.

Qaxl is structured around a central parallelize dispatch function that takes a form and invokes one of several parallelize-particular-forms, which in turn call parallelize recursively on elements of the form as needed: Each of these functions returns a hashmap of

  • :par - which is true if any q/suspendable? functions were found within at any level
  • :form - the parallelized form itself

When :par is false, we know we can just leave that form alone.

In addition to the input form, it will also be necessary to pass a hash-map s2p of symbols to promises. Usually, s2p will be passed down unchanged when processing sub-forms, but it will be augmented in the course of processing a (let ...) form.

The fundamental building block for a number of form handlers is a utility for taking a sequence of these parallelized {:par ... :form ...}, and pulling out the ones where :par is true into let bindings of promises. This is a reduction over a list of forms by a function (fn [[bs args] p] ...), which accumulates bindings into a vector bs, and arguments into a vector args (just assume for now that the main parallelize dispatch function exists):

(defn- par-to-bindings [forms s2p] ;; => [bs args]
  (let [ps        (map #(parallelize % s2p) forms)
        [bs args] (reduce (fn [[bs args] p]
                            (if (:par p)

If p is :parallelizable, then bs is augmented to bind a new promise, and args is augmented to dereference that promise,

                              (let [ch (gensym "p")]
                                [(concat bs [ch `(q/promise (fn [] ~(:form p)))])
                                 (conj args `(deref ~ch))])

while if it's not parallelizable, bs stays the same, and we just append p's form to args:

                              [bs (conj args (:form p))]))
                          [[] []]
                          ps)]
    [ps bs (seq args)]))

With the binding function defined, parallelize-func doesn't have to do much more than build the let if there are promise bindings, or a normal form if there aren't.

(defn- parallelize-func [forms s2p]
  (let [[ps bs forms] (par-to-bindings forms s2p)]
    (if (seq bs)
       {:form `(let [~@bs] (~@forms)) :par true}
       {:form `(~@forms)}

In some cases, we can't just transform the way a function is called, but we need to actually change the function. For example, it would be silly to transform

   (map foo [1 2 3])

into

 (let
  [p30443
   (q/promise (fn [] foo))]
  (map @p30443 [1 2 3])),

We really need a completely new map, which doesn't turn the function itself into a promise, but instead creates promises for each application of the function, derefing them in bulk after they've all been launched.

Haxl and friends have this problem too. You can see the explicit .traverse in the Scala example above, and Haxl actually implements mapM for their fetch class as traverse. In our case, since we've got the whole code form to play with, we can keep the same name, but only bother transforming it if necessary.

We start with our par-to-bindings utility, after which we'll know if there's anything parallel going on:

(defn- parallelize-map [[_ & forms] s2p]
  (let [[ps bs [f & args]] (par-to-bindings forms s2p)
        m1  (if-not (:par (first ps))
              `(map ~f ~@args)

If we do need to parallelize, we do so by mapping twice. First, to create a sequence of promises, and then to deref them:

              `(map deref
                    (doall (map (fn [& xs#]
                                  (q/promise (fn [] (apply ~f xs#))))
                                ~@args ))))]
    {:form (if (seq bs) `(let [~@bs] ~m1) m1) :par (some :par ps)}))

Note the doall, without which laziness will resequentialize our masterpieces.

Not surprisingly, parallelizing existing (let ...) forms is among the more complicated operations, because we need to remember which symbols in the original code correspond to which promises, causing s2p to evolve as we process. As usual, "evolve" means a reduction over something, in this case the original bindings, while accumulating in bs1 the bindings of promises to generated symbols, in bs2 the bindings of original symbols to the deref'd promises, and in s2p the new map of symbol bindings.

(defn parallelize-let [[_ bs & forms] s2p]
  (let [[bs1 bs2] (reduce
                   (fn [[bs1 bs2 s2p] [s form]]

We parallelize the righthand side of each original binding, using the current symbol map:

                     (let [{:keys [form par]} (parallelize form s2p)]
                       (if par
                         (let [ch (gensym "p")]

Bind the promises:

                           [(concat bs1 [ch `(q/promise (fn [] ~form))])

Bind original symbols to the deref'd promises:

                            (concat bs2  [s `(deref ~ch)])

Augment the symbol table:

                            (assoc s2p s ch)])

Unless the rhs wasn't parallelized, in which case we paste the binding as-is:

                         [bs1 (concat bs2 [s form]) s2p])))
                   [[] [] s2p]
                   (partition 2 bs))
        ps  (parallelize-forms forms s2p)]
    {:form `(let [~@(concat bs1 bs2)] ~@(map :form ps)) :par (or (seq bs1) (some :par ps))}))

This relies on dispatching in recursions of parallelize to parallelize-subst, which makes the substitutions dictated by s2p

(defn- parallelize-subst [s s2p]
  (if (symbol? s)
    (if (contains? s2p s)
      {:form `(deref ~(get s2p s)) :s2p s2p :par true}

and, while we're at it, also marks symbols as parallelizable if they represent suspendable functions:

      {:form s :par (some-> s resolve var-get q/suspendable?) :s2p s2p})
    {:form s :s2p s2p}))

Are we done?

If the goal is to mirror the do/mapM style of Haxl, then maybe yes.

With our query faked as a simple Quasar suspendable,

(q/defsfn qquery [s i] (println (str s "(" i ")")) (q/sleep 2000)  (get aanswers [s i]))

(q/sleep is non-blocking in fiber-land), the test program is reasonably pretty:

  (qaxl (map #(str (qquery "getName" (qquery "getBff" %)) (qquery "getLastName" %))
              (qquery "getIds" 37)))

We get

getIds(37)
    <pause>
getLastName(321) getBff(123) getBff(321) getLastName(123)
    <pause>
getName(888) getName(777)
    <pause>
("MuttTrump" "JeffPutin")

which is exactly what we want.

In around a hundred lines of code, we've handled function application, map and simple let (minus the fancy destructuring capabilities), but if the goal is to make arbitrary code fully parallelized simply by wrapping it in our qaxl macro, there's a long way to go. Handling the special forms, macros and higher-order functions of the standard Clojure libraries would be daunting enough. Dealing correctly with anything we might find in a library may be impossible.

Takeaways

  1. Launching waves of asynchronous queries is pretty much equivalent to full-on batching, and if you write non-blocking code for a living, you've probably been doing it without thinking about it.
  2. The transformation into waves is basically a topological sort.
  3. In Haxl, you sort by repeatedly pruning the leaves of a sequence of trees emitted by a monadic expression.
  4. In Qaxl, you sort the normal way, by DFS of the tree.
  5. With both, the concurrency is implicit in the structure of the computation, and there is still no fork.

  1. See, for example, chapter 6 of NodeJS Design Patterns

  2. The couchbase documentation, for example, explains that

    Asynchronous clients inherently batch operations: because the application receives the response at a later stage in the application, it may attain batching via issuing many requests in sequence.

    implying that either the server or the client-side API code is doing something clever when similar queries are made around the same time. This post makes the point more generally. 



Comments

comments powered by Disqus