Select, few
Recently, there's been a lot of interest in DSLs to address the so-called N+1 selects problem, which describes the very common situation where you
- Do one database query to get a list of things, say user IDs;
- for each, call some sort of processing function that happens to
- do another query, say for each user's name.
It can be even worse than that, as in this example, where (for some reason) we want to append the last name of every person in a group to the first name of their best friend. (Maybe they're getting married.)
(defn mangleNamesFromGroup [gid]
(join ", " (map #(str (getFirstName (getBff %)) " " (getLastName %))
(getUserIds gid))))
To the database, this looks like a call to select id
s in the group, followed by
interleaved calls to getBff
, getFirstName
, getLastName
. So it's sort of
the 3N+1
selects problem.
Code like this can be easily optimized in different ways, depending on
the capabilities of the database. E.g., maybe there are efficient plural versions of
all the get
functions:
(defn mangleNamesFromGroup [gid]
(let [ids (getUserIds gid)
lns (getLastNames ids)
bfs (getBffs ids)
fns (getFirstNames bfs)]
(join "," (map #(str %1 " " %2) ids lns))))
Or, if the database supports joins directly, we could write a special bespoke function to do the whole thing for us.
It's also possible that it's ok to continue doing repeated queries for single values, as long as similar queries are performed at approximately the same time2. We still have to rewrite the code so the queries aren't alternating and blocked on each other:
(defn mangleNamesFromGroup [gid]
(let [ids (getUserIds gid)
lns (map getLastName ids)
bfs (map getBff ids)
fns (map getFirstName bfs)]
(join "," (map #(str %1 " " %2) fns lns))))
Now, a wave of getLastName
s, followed by getBff
s, and then the getFirstName
s go out,
and assuming that some facility is grouping each batch into a single query, we now have
a 3+1=4
problem.
This is not awful, but... no actually it is. Why should we have to
contort our code around the best practices of whatever database we
happen to be using? Why should we have to refactor our code just
because we switch to a new database that wants the batching to be done
slightly differently? (And you also might wonder why we wait to do the
getBff
s until the getLastName
s are done.)
Don't you wish you could just sprinkle some pixie dust on the code we started with, and it would somehow all be ok?
Haskell programmers, delicate dreamers that they are, prominently bemoaned the lack of pixie dust. And then they made some.
Haxl arose
In 2014, Simon Marlow (of Parallel and Concurrent Programming in Haskell fame) and collaborators at Facebook published There is no Fork: an Abstraction for Efficient, Concurrent, and Concise Data Access. (There's also a nice talk, and Facebook even open-sourced the code.)
There's a Haxl-inspired package for clojure, and at least three for Scala, though the last of these, Stitch, is a private, internal project at Twitter.
At the risk of over-simplifying, I'm going to divide their idea into two pieces and largely talk about only one of them.
Piece I
One of the pieces
is about how to cause code written using standard Haskell language
features and library abstractions (do
notation, applicative
functors - plus a little textual code transformation) to induce some
kind of batched I/O behavior. For example, using their Fetch
type
might look like
runHaxl env $ do
ids <- getIds grp
mapM getBffName
where
getBffName id = do
bff <- getBff id
fn <- getFirstName bff
ln <- getLastName id
fn ++ " " ++ ln
and the scala implementations are along the lines of
getIds(grp).traverse {id =>
for(bff <- getBff(id)
fn <- getFirstName(bff)
ln <- getLastName id)
yield s"$fn $ln"
}
(These might be not exactly right, but since I didn't tag this as a Haskell or Scala post, that's ok, yes?) There was, of course, concurrency before Haxl, but, they note
All these previous formulations of concurrency monads used some kind of fork operation to explicitly indicate when to create a new thread of control. In contrast, in this paper there is no fork. The concurrency will be implicit in the structure of the computations we write.
They achieve this by introducing an applicative functor:
This is the key piece of our design: when computations in
Fetch
are composed using the<*>
operator, both arguments of<*>
can be explored to search for Blocked computations, which creates the possibility that a computation may be blocked on multiple things simultaneously. This is in contrast to the monadic bind operator,>>=
, which does not admit exploration of both arguments, because the right hand side cannot be evaluated without the result from the left.
This means that there's actually slightly more than the usual do
magic going on
In this case, the getBff
and getLastName
ought to be completely independent, so they
actually preprocess the second do
into
do
(bff,ln) <- (,) $ (getBff id) <*> (getLastName id)
fn <- getFirstName bff
fn ++ " " ++ ln
using a compiler extension.
There is much
handwringing over
the need to use an applicative implementation that differs from the
default ap
defined for monads. Whether or not the controversy
interests you, it's not of much significance in Clojure, where
multi-argument map
-like functions abound, and typological conundra
do not keep us up at night.
Moreover, I will argue that the need to use
applicatives in the first place is somewhat artificial. Once we admit
that we're willing to do some sort of dependency-aware code transformation (as with do
),
we can avoid applicatives by doing just a little more of it.
Piece II
The language agnostic piece is that code that looks like it's doing normal stuff
like looping and fetching is actually constructing a dependency graph.
Instead of returning results directly, functions return a mutable
Result
object that contains either
- an actual result value,
- a list of other
Result
s that we're waiting on - a description of a query that could fulfill the
Result
.
The actual query
functions are memoized to return a Result
object that starts out as
a Result(query="whatever")
, but at some point in the future can have a value poked into it.
Non-query function calls are jiggered so that, if any of their arguments is a non-value Result
, they
will themselves return a Result(waiting=...)
, listing those Results
, on which they depend.
Eventually, we end up
with a tree of nested Result
s,
the leaves of which are queries to fulfill. We scoop up all the
leaves, figure out how to fulfill the queries and cram the results
back into their Result
object, and repeat the program from the beginning.
On the second run, we get a new batch of leaf queries to fulfill, and we repeat until there are no blocks, and a result can be computed.
The really important thing here is that, once we write our program using Haxl, the "scoop" and "figure" work is somewhat decomplected from program logic. I say somewhat, because in writing with the Haxl DSL in the first place, we've already made concessions to the fact that batching is in some way important.
Running mangleNames under a Haxl-like system
It's interesting to trace through our example program, noting what
Result
trees might be produced in each pass. Actually, the trees I'm
noting were produced by a quick-and-dirty Clojure implementation that
will be explained in the next section, which you might want to skip to
if you prefer to read Clojure code than to imagine it. (Or if you'll
be annoyed when you discover that the next second is really a superset
of this one.)
Our haxlified mangleNames
on the first pass might return something
like,
Result[:waiting (Result[:query ("select id from groups where gid=" 37)])]
indicating that we're blocked on a result that's blocked on our first query.
Assume that our system knows how to run the query, the results get shoved back into the Result
, giving
us a tree that looks like this:
Result[:waiting [Result[:value [321 123]]]]
Now we run the program again, and the memoized groups query returns the now
fulfilled Result[:value [321 123]
object, so we block a little further down. The new tree blocked on a different group of queries:
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:query ("select bff from stuff where id=" 123)])]
Result[:query ("select lastName from stuff where id=" 123)])]
Result[:waiting (
Result[:waiting (
Result[:query ("select bff from stuff where id=" 321)])]
Result[:query ("select lastName from stuff where id=" 321)])])])]
Now we scoop up these two similar queries, let our database combine them which whatever clever magic it prefers,
and inject the answers into the Result
objects so the tree looks like
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:value 777])]
Result[:value "Trump"])]
Result[:waiting (
Result[:waiting (
Result[:value 888])]
Result[:value "Putin"])])])]
and run again, now yielding a batch of similar queries for name
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:query ("select firstName from stuff where id=" 777)])]
Result[:waiting (
Result[:query ("select firstName from stuff where id=" 888)])])])]]
whose results we stuff back into their Result
holders:
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:value "Mutt"])]
Result[:waiting
(Result[:value "Jeff"])])])]
And we run the program a fourth time, finally getting back
"Mutt Trump, Jeff Putin"
A shoddy implementation in Clojure
It was instructive for me to hack this up in Clojure, even though there's already a a more professional implementation called Muse.
First, note that the Haskell Haxl Result
has an IORef
in the middle.
data Result a = Done a | Blocked (Seq BlockedRequest) (Fetch a)
data BlockedRequest = forall a . BlockedRequest (Request a) (IORef (FetchStatus a))
data FetchStatus a = NotFetched | FetchSuccess a
They don't actually avoid mutation altogether, and neither will we.
The possibly blocking object
(deftype Result [a])
(defmethod print-method Result [o, w] (.write w (str "Result" @(.a o))))
contains a volatile
that will hold [:value final-result]
,
[:query some-query]
or [:waiting list-of-results]
. I overrode print-method
to
make the output a little more presentable here. You probably wouldn't do that in real life.
A basic utility function tries to extract results from a potential Result
,
returning the extracted value if possible:
(defn try-get [r]
(if-not (instance? Result r) r
(match [@(.a r)]
[[:value v]] v
:else r)))
The next most fundamental operation is to apply a function to a collection...
(defn try-coll [coll f]
(let [coll (try-get coll)]
or what we hope is a collection, as it might itself be a Result
...
(if (instance? Result coll) coll
or contain unfulfilled Result
s...
(let [coll (map try-get coll)
bs (filter #(instance? Result %) coll)]
in which case, we note our dependency on them,
(if (seq bs) (->Result (volatile! [:waiting bs]))
but if the collection is complete, we actually evaluate the function,
(try-get (f coll)))))))
Our queries are just placeholders:
(def query (memoize (fn [& q] (->Result (volatile! [:query q])))))
And we write some helper methods to invoke functions whose arguments
might be Result
s or which might return them:
(defn block-apply [f & args] (try-coll args #(apply f %)))
(defn block-map [f coll] (try-coll (try-coll coll #(map f %)) identity))
For demo purposes, it will be useful to extract all the query leaf nodes in a tree:
(defn reap [b]
(if-not (instance? Result b) []
(match [@(.a b)]
[[:query _]] [b]
[[:waiting rs]] (mapcat reap rs))))
Testing out the shoddy implementation in Clojure
As promised, this is going to look just like the pretend example in the second before last.
We define some query functions,
(defn getUserIds [gid] (query "select id from groups where gid=" gid))
(defn getFirstName [id] (query "select firstName from stuff where id=" id))
(defn getLastName [id] (query "select lastName from stuff where id=" id))
(defn getBff [id] (query "select bff from stuff where id=" id))
in terms of which we write the program, using our block-
helpers to indicate that the
map
or function application might be blocking:
(defn mangleNamesFromGroup [gid]
(block-apply join ", "
(block-map
#(block-apply str (block-apply getFirstName (getBff %)) " "
(getLastName %))
(getUserIds gid))))
If we were doing the queries sequentially, there we would be alternating
calls to getBff
and getFirstName
, which would be difficult to optimize.
Let's see what our haxly conniving gets us:
repl> (def x1 (mangleNamesFromGroup 37)) x1
As expected, we're blocked on the first query, which we pretend returns [321 123]
:
repl> (defn fulfill [r v] (vreset! (.a r) [:value v]) r)
repl> (-> x1 reap first (fulfill [123 321]))
repl> x1
Result[:waiting (Result[:value [123 321]])]
Now run the exact same function again,
repl> (def x2 (mangleNamesFromGroup 37))
repl> x2
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:query ("select bff from stuff where id=" 123)])]
Result[:query ("select lastName from stuff where id=" 123)])]
Result[:waiting (
Result[:waiting (
Result[:query ("select bff from stuff where id=" 321)])]
Result[:query ("select lastName from stuff where id=" 321)])])])]
This time we get all the queries depending on the user ids we have, and we fake fulfillment:
repl> (doall (map #(do (fulfill %1 %2)) (reap x2) [777 "Trump" 888 "Putin"]))
repl> x2
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:value 777])]
Result[:value "Trump"])]
Result[:waiting (
Result[:waiting (
Result[:value 888])]
Result[:value "Putin"])])])]
and again
repl> (def x3 (mangleNamesFromGroup 37))
repl> x3
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:query ("select firstName from stuff where id=" 777)])]
Result[:waiting (
Result[:query ("select firstName from stuff where id=" 888)])])])]]
repl> (doall (map #(do (fulfill %1 %2)) y3 ["Mutt" "Jeff"]))
repl> x3
Result[:waiting (
Result[:waiting (
Result[:waiting (
Result[:value "Mutt"])]
Result[:waiting
(Result[:value "Jeff"])])])]
and again,
repl> (def x4 (mangleNamesFromGroup 37))
repl> x4
"Mutt Trump, Jeff Putin"
Exactly what problem have we solved?
Or at least what problem would we have solved if we hadn't typed in the query responses manually?
Actually, I think we've solved three problems that are only incidentally related.
Problem 1: We've managed to batch similar queries together. That's nice, but, as noted before, the database might have done this anyway had we simply sent them at approximately the same time.
Problem 2: We've desequentialized individual, interleaved queries, so they can be written to reflect the logical use of their results rather than the preferred timing of execution.
Problem 3: We did this without explicitly forking.
If you're a web developer, you might think that this is like an elixir for gout, as a binge on blocking IO is a bit of a luxury to begin with. Had we been forced from the beginning to write non-blocking code - say, in javascript - the sequentiality problem would likely never have arisen.1 In fact, it might have required willful effort to force the queries to run sequentially and interleaved.
To prove the point, let's temporarily drop the "there is no fork" part of our
adventure. We'll mock asynchronous queries as returning a core.async
channel
that prints to stdout so we can see what it's doing and then, after a short delay,
just munges some strings:
(defn aquery [s i] (go (println (str s "(" i ") ")) (<! (timeout 1000)) (str s "-" i)))
The simplest async version of our program just launches the queries and dereferences
them immediately, using async/map
to turn a sequence of channels into a single channel:
(defn mangleNamesFromGroupAsync [gid]
(a/<!! (a/map vector
(map #(go (str (<! (aquery "getName" (<! (aquery "getBff" %))))
" " (<! (aquery "getLastName" %))))
(a/<!! (aquery "getIds" gid))))) )
With some minor reformatting:
repl> (mangleNamesFromGroupAsync 37)
getIds(37)
<pause>
getBff(123) getBff(321)
<pause>
getName(777) getName(888)
<pause>
getLastName(123) getLastName(321)
<pause>
["Mutt Trump" "Jeff Putin"]
that all the getBff
queries were sent without waiting before the getName
queries,
but with a lot less fuss and mutation, so maybe we were in nirvana all along...
Not so fast
First, while it might be second nature to clojurescript (or whatever
script) developers, this (a/map vector (map #(go (<! aquery ...) ...)
...)
business is already a bit removed from plain old (map (aquery
..))
. Without thinking about it too much, we complected our code
beyond its functional intent to also express an opinion about how
asynchronicity should be juggled.
Furthermore, it's still not as asynchronous as it should be. There's no reason for
the getBff
s and getLastName
s to go out in different waves, but we introduced an
artificial dependency in the order of arguments to str
. To remove the dependency,
we need to contort a bit further, explicitly launching as many of our queries as we
can, before we need them:
(defn mangleNamesFromGroupAsync2 [gid]
(a/<!! (a/map vector
(map #(go (let [c1 (go (<! (aquery "getName" (<! (aquery "getBff" %)))))
c2 (aquery "getLastName" %)]
(str (<! c1) " " (<! c2))))
(a/<!! (aquery "getIds" gid))))) )
Now
repl> (mangleNamesFromGroupAsync2 37)
getIds(37)
<pause>
getLastName(123) getLastName(321)
getBff(123) getBff(321)
<pause>
getName(777) getName(888)
<pause>
["Mutt Trump" "Jeff Putin"]
we have only two pauses.
So even with a certain amount of the magic taken care of by asynchronous constructs, we still have to shovel the code around to get it to behave. Not only have we not rid ourselves of fork, but we've made deploying it correctly a central part of the coding challenge.
A tree is a tree
I mean, if you've looked at a hundred thousand acres or so of trees — you know,
a tree is a tree, how many more do you need to look at?
- Ronald Reagan
Our naive implementation
(defn mangleNamesFromGroup [gid]
(join ", " (map #(str (getFirstName (getBff %)) " " (getLastName %))
(getUserIds gid))))
has an implicit tree structure, defined by its nested parentheses.
(map . . )
| |
#(str . " " . ) (getUserIds gid)
/ \
(getFirstName . ) (getLastName %)
|
(getBff %)
Then, when we ran
(defn mangleNamesFromGroup [gid]
(block-apply join ", "
(block-map
#(block-apply str (block-apply getFirstName (getBff %)) " "
(getLastName %))
(getUserIds gid))))
we got a Result
tree, whose edges were defined by references to
child Result
s in the :waiting
list. Admittedly, we didn't get it all at once,
but in 3 successive passes:
:waiting______________
/ \
Pass 1 :query :waiting_______
getIds / \
:waiting__ :waiting___
/ \ \ / \ \
Pass 2 :query :query \ :query :query \
getLN getBff \ getLN getBff \
:query :query
Pass 3 getFN getFN
After we translated
(defn mangleNamesFromGroupAsync [gid]
(a/<!! (a/map vector
(map #(go (str (<! (aquery "getName" (<! (aquery "getBff" %))))
" " (<! (aquery "getLastName" %))))
(a/<!! (aquery "getIds" gid))))) )
to
(defn mangleNamesFromGroupAsync2 [gid]
(a/<!! (a/map vector
(map #(go (let [c1 (go (<! (aquery "getName" (<! (aquery "getBff" %)))))
c2 (aquery "getLastName" %)]
(str (<! c1) " " (<! c2))))
(a/<!! (aquery "getIds" gid))))) )
we get an extremely similar tree, with edges defined by asynchronous waits.
(The <n>
below are supposed to indicate the nth invocation of the closure.)
map__________
/ \
aquery #(go <0> ........ <1> )
getIds / \
let______ let______
/ \ \ / \ \
aquery aquery \ aquery aquery \
getLN getBFF \ getLN getBFF \
aquery aquery
getLN getLN
In this case, there are no formal batches, but the vertical position of the node in the diagram indicates roughly when the query actually occurs. In practice, they may be jittered up and down a bit, and the order in which they are executed is no longer deterministic, but if we, like Haxl, assume that queries are referentially transparent, the programs will return identical results.
Another difference is, as noted previously, that, with no explicit
batching, the core.async
approach relies on someone else to
recognize a run of similar queries, and if the someone lives on the
server, this will cause increased network chatter. At the same time,
the Haxl style has the ironic side effect of separating computation
and querying into distinct, sequential phases, in the name of
desequentializing the queries within each phase.
In both cases, similar functional expressions were converted into similar dependency trees, with queries occurring in similar temporal waves. And in both cases, the waves are ordered in the same way, with the deepest dependencies coming first, that is in topological sort order.
We infer that the mechanics of converting a functional
expression into parallelized, asynchronous form (which so far, we've
done manually) ought to be very similar to the creation of Result
trees in a Haxl approach, and that both of them are essentially topological
sorts.
That one can achieve batching and parallelism through asynchronous code transformation is not a novel observation. There's a 2014 paper from Ramachandra, Chavan, Guravannavar, and Sudarshan at ITT actually called Program Transformations for Asynchronous and Batched Query Submission. They present a set of formal transformation rules for general procedural programs, and refer to an implementation for Java in their DBridge optimizer. Additionally, about 17 minutes into Jake Donham's Stitch talk, he describes a possible batching strategy as "accumulating calls until you hit some threshold or something like that," which seems to imply that these calls are arriving asynchronously.
It's worth noting that, while the ITT optimizer allows you not just to write batching-oblivious code, but to write exactly the same code as you would if queries were instantaneous, this is not an essential feature of compile-time desequentializing. It's a convenience, but a less important step beyond the main goal of letting code express functional intent in a logical way.
(It may have occurred to the careful reader that I am not carefully distinguishing concurrency and parallelism. This is true but uninteresting.)
Gifts from the Gods
What is best in life - according to the original script of Conan the Barbarian - is to discover that there's some nominally difficult thing that might actually be easy, because you can slough off nearly all the work onto other people. Once we generalize "doing all our queries into batches" to "doing everything in parallel", we can rely on a few great tools in the Clojure tool-shed.
Gift I: Homoiconicity
The first set of tools comes from the fact that algorithms expressed
in lisp are vastly easier to transform than in most languages. Haxl
derivatives create a Result
structure at runtime; DBridge parses
java code into data structures to which transformation rules are then
applied. Clojure programs are already a data structure, so we get to
skip a step and transform the code directly.
The transformation, as noted, is really a topological sort. Given
(foo form1 form2 ...)
we simply want
(xform `(foo form1 form2 ...))
to become
`(let [c1 (go ~(xform form1))
c2 (go ~(xform form2))
... ]
(foo (<! c1) (<! c2) ...)
It's clear how this parallelizes independent argument forms; we can also see that this depth-first recursive transformation has the effect of hoisting nested arguments earlier in the evaluation sequence. For a boring function of a function,
(foo (bar x))
we get
`(let [c1 (go ~(xform `(bar x)))]
(foo (<! c1)))
which becomes
`(let [c1 (go
(let [c2 (go x)]
(bar (<! c2))))]
(foo (<! c1)))
In building up the new structure, we traverse the call graph exactly once, so the whole procedure is O(N), where N is the total number of arguments of all functions in the program. It seems that the Haxl procedure requires re-generating the graph each time leaf nodes are extracted, making it O(N log N). This reflects the fact that the Haxl ordering is stronger than topological: not only is every query run before its result is required (which is of course a necessary ordering), no query is run before the previous batch completes (which is really not necessary).
Once you get past a few ritual incantations, it is almost trivial to produce a macro that does our transformation:
(defmacro parallelize-func-stupid [form]
(let [[f & args] form ;; Extract function and arguments
cs (repeatedly (count args) gensym) ;; Generate some channel names
pargs (map parallelize-func-stupid args) ;; Recursively transform the args
bs (interleave cs ;; Construct the binding block
(map (fn [parg] `(a/go ~parg)) pargs))
args (map (fn [c] `(a/<! ~c)) cs)] ;; Deref the channels
`(let [~@bs] (~f ~@args))))
Presto!
Well, an incomplete presto. This will actually blow up for arguments that are not
themselves function applications, which means it only works on infinitely sized programs.
Also, we've been acting as if go
returned a promise that can be dereferenced an
arbitrary number of times, which of course it really doesn't.
We'll do better below.
Gift II: JVM Fibers and Quasar
So far, all my parallelization examples have used core.async
, which is certainly not a bad way to go,
but it might be fun to look at an interesting development in the JVM world.
Quasar is an implementation of fibers in Java,
with a Clojure API called Pulsar.
A fiber presents itself as very much like a normal thread, but much cheaper to create. Like other inexpensive concurrency constructs, fibers are, under the hood, essentially co-routines - multiple streams of execution handing off control to each other cooperatively, so there may be vastly more such streams than the number of threads available (though of course the thread pool does limit the number of fibers that can be actively doing anything at a given time).
In core.async
, cooperation is achieved by rewriting code into
state-machine form, using macros. In Quasar, it's done by rewriting
at the byte-code level, so that methods marked "suspendable" can give
up control when waiting on each other. An advantage of the bytecode
approach is that stack traces are preserved, making debugging easier,
and fibers are said to be more efficient than channels when there are
vast numbers of them.
One of the available Pulsar APIs is a drop-in replacement for
core.async
, but the requirement to wrap channel operations in a go
block now feels like an artificial inconvenience.
Additionally, what we need for
haxilating is not really a CSP channel but a promise, and, while
clojure.core.async
implements promises on top of channels, the
Pulsar promise is closer to the core fiber abstraction, and
co.paralleluniverse.pulsar.async
channels represent an additional
layer of abstraction, so I preferred to use promises directly.
The basic element of a Pulsar program is the suspendable function,
which you define with defsfn
, e.g.
(q/defsfn my-suspendable [x] (other-suspendable x))
It's not turtles all the way down: at some point, one of the
suspendables will actually give up control while waiting for a
callback from some external event, and Pulsar provides an await
macro to facilitate that:
(q/defsfn other-suspendable [x] @(fiber (q/await httpkit/request x)))
Here, the macro expands to call the workhorse httpkit/request
with
our argument x
, plus a callback, and then faux blocks (meaning that
the Quasar runtime parks us) until that
callback is called. (Actually, Pulsar has already wrapped
httpkit/kit
in a defreq
macro, from which this example is
shamelessly copied.) One thing that's cool about these suspendable
functions is that they can be used in normal code if you want, but you
can identify them with suspendable?
, which is going to be useful in
determining exactly which forms will benefit from parallelization.
The function parallelization macro in Pulsar-land is pretty much the
same as in core.async
country, except that we create promises using
(q/promise (fn [] form-to-fulfill-promise)))
:
(defmacro parallelize-func-stupid2 [form]
(let [[f & args] form
ps (repeatedly (count args) gensym)
pargs (map parallelize-func-stupid2 args)
bs (interleave ps (map (fn [parg] `(q/promise (fn [] ~parg))) pargs))
args (map (fn [p] `(deref ~p)) ps)]
`(let [~@bs] (~f ~@args))))
QAXL - Quasar Haxl
Per its evocative suffix, the above parallelization functions are on dimwitted side. Fortunately, we mostly have to train it not to do certain things, like insisting on transforming everything whether or not it needs it.
Qaxl is structured around a central parallelize
dispatch function that takes a form
and invokes one of several parallelize-particular-form
s,
which in turn call parallelize
recursively on elements of the form as needed:
Each of these functions returns a hashmap of
:par
- which istrue
if anyq/suspendable?
functions were found within at any level:form
- the parallelized form itself
When :par
is false, we know we can just leave that form alone.
In addition to the input form, it will also be necessary to pass a
hash-map s2p
of symbols to promises. Usually, s2p
will be passed
down unchanged when processing sub-forms, but it will be augmented in
the course of processing a (let ...)
form.
The fundamental building block for a number of form handlers is a utility for taking a sequence of
these parallelized {:par ... :form ...}
, and pulling out the ones where :par
is true
into let
bindings of promises. This is a reduction over a list of forms by a function
(fn [[bs args] p] ...)
, which
accumulates bindings into a vector bs
, and arguments into a vector args
(just
assume for now that the main parallelize
dispatch function exists):
(defn- par-to-bindings [forms s2p] ;; => [bs args]
(let [ps (map #(parallelize % s2p) forms)
[bs args] (reduce (fn [[bs args] p]
(if (:par p)
If p
is :par
allelizable, then bs
is augmented to bind a new promise, and args
is
augmented to dereference that promise,
(let [ch (gensym "p")]
[(concat bs [ch `(q/promise (fn [] ~(:form p)))])
(conj args `(deref ~ch))])
while if it's not parallelizable, bs
stays the same, and we just append p
's form to args
:
[bs (conj args (:form p))]))
[[] []]
ps)]
[ps bs (seq args)]))
With the binding function defined, parallelize-func
doesn't have to do much more than
build the let
if there are promise bindings, or a normal form if there aren't.
(defn- parallelize-func [forms s2p]
(let [[ps bs forms] (par-to-bindings forms s2p)]
(if (seq bs)
{:form `(let [~@bs] (~@forms)) :par true}
{:form `(~@forms)}
In some cases, we can't just transform the way a function is called, but we need to actually change the function. For example, it would be silly to transform
(map foo [1 2 3])
into
(let
[p30443
(q/promise (fn [] foo))]
(map @p30443 [1 2 3])),
We really need a completely new map, which doesn't turn the function itself
into a promise, but instead creates promises for each application of the function,
deref
ing them in bulk after they've all been launched.
Haxl and friends have this problem too. You can see the explicit .traverse
in the
Scala example above, and Haxl actually implements mapM
for their fetch class as
traverse
. In our case, since we've got the whole code form to play with, we can keep
the same name, but only bother transforming it if necessary.
We start with our par-to-bindings
utility, after which we'll know if there's
anything parallel going on:
(defn- parallelize-map [[_ & forms] s2p]
(let [[ps bs [f & args]] (par-to-bindings forms s2p)
m1 (if-not (:par (first ps))
`(map ~f ~@args)
If we do need to parallelize, we do so by map
ping twice. First, to create
a sequence of promises, and then to deref
them:
`(map deref
(doall (map (fn [& xs#]
(q/promise (fn [] (apply ~f xs#))))
~@args ))))]
{:form (if (seq bs) `(let [~@bs] ~m1) m1) :par (some :par ps)}))
Note the doall
, without which laziness will resequentialize our masterpieces.
Not surprisingly, parallelizing existing (let ...)
forms
is among the more complicated operations, because we need to remember which symbols in
the original code correspond to which promises, causing s2p
to evolve as we process.
As usual, "evolve" means a reduction over something, in this case
the original bindings, while accumulating in bs1
the bindings of promises to
generated symbols, in bs2
the bindings of original symbols to the deref'd promises,
and in s2p
the new map of symbol bindings.
(defn parallelize-let [[_ bs & forms] s2p]
(let [[bs1 bs2] (reduce
(fn [[bs1 bs2 s2p] [s form]]
We parallelize the righthand side of each original binding, using the current symbol map:
(let [{:keys [form par]} (parallelize form s2p)]
(if par
(let [ch (gensym "p")]
Bind the promises:
[(concat bs1 [ch `(q/promise (fn [] ~form))])
Bind original symbols to the deref'd promises:
(concat bs2 [s `(deref ~ch)])
Augment the symbol table:
(assoc s2p s ch)])
Unless the rhs wasn't parallelized, in which case we paste the binding as-is:
[bs1 (concat bs2 [s form]) s2p])))
[[] [] s2p]
(partition 2 bs))
ps (parallelize-forms forms s2p)]
{:form `(let [~@(concat bs1 bs2)] ~@(map :form ps)) :par (or (seq bs1) (some :par ps))}))
This relies on dispatching in recursions of parallelize
to parallelize-subst
, which
makes the substitutions dictated by s2p
(defn- parallelize-subst [s s2p]
(if (symbol? s)
(if (contains? s2p s)
{:form `(deref ~(get s2p s)) :s2p s2p :par true}
and, while we're at it, also marks symbols as parallelizable if they represent suspendable functions:
{:form s :par (some-> s resolve var-get q/suspendable?) :s2p s2p})
{:form s :s2p s2p}))
Are we done?
If the goal is to mirror the do
/mapM
style of Haxl, then maybe yes.
With our query faked as a simple Quasar suspendable,
(q/defsfn qquery [s i] (println (str s "(" i ")")) (q/sleep 2000) (get aanswers [s i]))
(q/sleep
is non-blocking in fiber-land), the test program is reasonably pretty:
(qaxl (map #(str (qquery "getName" (qquery "getBff" %)) (qquery "getLastName" %))
(qquery "getIds" 37)))
We get
getIds(37)
<pause>
getLastName(321) getBff(123) getBff(321) getLastName(123)
<pause>
getName(888) getName(777)
<pause>
("MuttTrump" "JeffPutin")
which is exactly what we want.
In around a
hundred lines
of code, we've handled function application,
map
and simple let
(minus the fancy destructuring capabilities),
but if the goal is to make arbitrary code fully parallelized simply by
wrapping it in our qaxl
macro, there's a long way to go. Handling
the special forms, macros and higher-order functions of the standard
Clojure libraries would be daunting enough. Dealing correctly with
anything we might find in a library may be impossible.
Takeaways
- Launching waves of asynchronous queries is pretty much equivalent to full-on batching, and if you write non-blocking code for a living, you've probably been doing it without thinking about it.
- The transformation into waves is basically a topological sort.
- In Haxl, you sort by repeatedly pruning the leaves of a sequence of trees emitted by a monadic expression.
- In Qaxl, you sort the normal way, by DFS of the tree.
- With both, the concurrency is implicit in the structure of the computation, and there is still no fork.
-
See, for example, chapter 6 of NodeJS Design Patterns. ↩
-
The couchbase documentation, for example, explains that
Asynchronous clients inherently batch operations: because the application receives the response at a later stage in the application, it may attain batching via issuing many requests in sequence.
implying that either the server or the client-side API code is doing something clever when similar queries are made around the same time. This post makes the point more generally. ↩
Comments
comments powered by Disqus