Update 2015-01-12
The algorithm as it exists in HEAD is somewhat different from the below, in ways that I'll describe (eventually) in an another post. In some ways, it's closer to Fork-Join, but with important differences to support reentrancy, share results of duplicate requests and adjust for the costs of distribution.
Recap of recap
In a previous post, I introduced a framework called Girder (the code is on github), which aims to facilitate Plain Old Functional Programming on distributed systems. By POFP, I mean code that, as much as possible, consists of normal looking calls to normal looking functions, the only requirement for which is referential transparency: there are no side effects, and calls to the same function with the same arguments should always return the same value.
Functional programming, in my view, is dramatically less functional when you use a framework that requires you to cast your algorithm in terms of message passing, whether in the form of key-value pairs for map/reduce or explicitly as mailbox deliveries in the actor model. What makes me particularly uncomfortable is an emerging conventional wisdom that one should write in these styles even when the program will run comfortably on one machine, because someday you will need to scale out, and you might as well get the pain out of the way now.
To over-simplify more than a bit, I want to distinguish between algorithms that are innately about large scale data aggregation (in the case of map-reduce) or dynamic response to signals (in the case of the actor model), and those that are most elegantly expressed in terms of functions, but might benefit from running on more than one core. Put in reverse, if the problem is innately about data aggregation or dynamic response to signals, then maybe you should use one of these message-based models, even if you know in advance that scale will never be important.
In this post, I'll explain the core machinery behind Girder, showing how requests get dispatched, distributed and executed.
Recap of examples
I don't want to repeat too much, but here are a couple of the examples from earlier. The first illustrates a sort of calculation that would obviously benefit from distributed, parallel execution.
(cdefn calc-price [details] (calculate-something-expensive deets))
(cdefn calc-portfolio [spec]
(let [reqs [ [f (munge spec)] [f (diddle spec)]]]
(reduce + (requests reqs))))
(request "pool" (calc-portfolio "xyz"))
Of note here is that the calculation requests look almost exactly like function calls. (With a little more macro work, I think I can get rid of the "almost.")
The second example illustrates code that might generate an error that we ought to be able to report on, irrespective of where on the grid the code executes:
(cdefn divide [x y] (float (/ x y)))
(cdefn ratio [i] (request (divide i (dec i))))
And, as shown last time, an the exception thrown when requesting
(ratio 1)
contains meaningful information where on the
"distributed stack" the error occurred.
Serialized requests
Requests, remember, are finite sequences that start with a function and all of whose successive members are EDN serializable. The whole request gets turned into a string with
(defn ->reqid [ [f & args] ]
(pr-str (concat [(fname f)] args)))
where fname
is so ghastly I sort of wince pasting it here, but
it's inspired by an only slightly less gruesome
post on
the Clojure mailing list, so it must be OK, right?
(def STR->OPT
(apply hash-map (mapcat #(vector (second %) (str (first %))) clojure.lang.Compiler/CHAR_MAP)))
(def OPTPAT
(re-pattern (str "\\b" (clojure.string/join "|" (vals clojure.lang.Compiler/CHAR_MAP)) "\\b" )))
(defn fname
"Extract the qualified name of a clojure function as a string."
[f]
(-> (str f)
(clojure.string/replace OPTPAT STR->OPT)
(clojure.string/replace-first "$" "/")
(clojure.string/replace #"@\w+$" "")
(clojure.string/replace #"_" "-")))
Al this regexing is
responsible for turning a string like "#resolve
d in the reverse operation,
(defn reqid->req [reqid]
(let [[f & args] (read-string reqid)
f (resolve (symbol f))
f (if (:girded (meta f)) f (cfn f))]
(apply vector f args)))
As you see, Clojure complicates the task by maintaining its own
mangling scheme for non-alphanumeric operators, and I'm violating some sort of ethical
code by extracting it from src/jvm/clojure/lang/Compiler.java
.
Remember from the last post that the macrocdefn
for defining functions
that return results via an async
channel, wrapped up in {:value ...}
or {:error ...}
.
That macro also set metadata to indicate
that the function was thus :girded
. If, as in the case of +
, it's not girded,
the easily imaginable
cfn
function takes care of doing so. This way, we can distribute
requests using boring old functions as well as fancy functions like calc-portfolio
that themselves
also make request.
Once again, it should be emphasized that if
your.namespace.here/gorgonzola
cannot be resolved, or resolves to
something different in different running instances, then terrible,
terrible things will happen.
Redis and core.async
The central state keeper behind all the remote calculations is
Redis. In theory, it could be something else, but
the only implementation of Girder-Backend
I've so far written is
for Redis-Backend
.
Redis is an in-memory database with many advantages. For our purposes, the greatest two are
- extraordinary speed (generally hundreds of thousands of operations, loosely defined, per second)
- certain assurances of atomicity.
Such assurances are contingent on proper functioning of the Redis process and the platform on which it runs, and, while Redis does generates periodic snapshots, from which it can be recovered, there are many plausible failure scenarios where you'll lose all your Redis data forever.
But, it is reasonable to ask, who cares? Obviously there are cases you might, but there are also many cases where you shouldn't. The primary assumption that we'll be writing purely functional code implies that the very worst consequence of data loss is having to run our program again. Again, while that might not always be acceptable, it very often is.
I'll discuss various implementations of methods in the Redis-Backend
implementation
of the Girder-Backened
protocol
below. Know for now that
(defrecord Redis-Backend [redis kvl]...)
holds some information about the connection to Redis and
a reference to what I'll call a key-value listener that's used for publishing
things internally.
In a post scheduled for a few months from now, I'll discuss integrating Girder with Datomic, with further considerations about data locality, but, for now, let's just assume that input data is provided "somehow."
Queues
To ease working with Redis, and to provide an abstraction layer that
will someday allow different back-ends, we make heavy use of
core.async
internally. (The user of Girder won't see it.)
Here's an implementation of a typical Girder-Backend
method in
Redis-Backend
. This creates a channel, reading from which will
actually be right-popping a Redis queue:
(crpop [this qkey queue-type]
(let [qkey (queue-key key queue-type)
bkey (queue-bak-key key queue-type)
out (lchan (str "crpop-" key))]
(async/go-loop []
(let [val (wcar (:redis this) (car/brpoplpush qkey bkey 60))]
(trace "crpop" key "got" val "from redis list" qkey)
(if (still-open? out)
(do (when val(>! out val))
(recur))
(debug "crpop" key queue-type "shutting down"))))
out))
In a design decision that I know some people object to, the out
channel does
double duty as a control channel. (You'll notice that still-open?
isn't part
of the officially exposed core.async
interface.)
Taking crpop
from the start:
The private functions xxx-key
just create strings that will be
used to name Redis lists. E.g. one that came up in last post's
examples ended up getting called "requests-queue-pool". Longer strings
might get md5'd into something briefer, to save wire wear, but that's
an implementation detail.
The reason for defining both qkey
and bkey
is that we'll
be using the Redis RPOPPLPUSH
command to atomically stash what
we pop off of one queue in a backup queue, which can be used for
recovery purpose if the the process running crpop
dies after
popping from Redis but before whatever it was popped was read from its
channel. This is the
recommended
implementation of reliable queues in Redis. For different back-ends,
the second key might be ignored.
The out
channel is defined with lchan
, which was written in desperation
to name async
channels and keep track of what's going in and out of them:
(def lchans (atom (cache/weak-cache-factory {})))
(defn lchan [name & [buf]]
(if-not (timbre/level-sufficient? :trace nil) (chan buf) ;; short-circuit if not at :trace level
(let [name (if (fn? name) (name) name)
c (async/map> #(do (trace "Channel" name "receiving" %) %)
(async/map< #(do (trace "Channel" name "delivering" %) %) (chan buf)))]
(trace "Channel" name "created:" c)
(swap! lchans #(assoc % c name))
c)))
At :trace
level, all writes to and reads from the channel will be logged.
I found this extraordinarily useful during development, since I also find it
extraordinarily easy to get confused in the
maze of twisty
core.async
channels it's so easy to create. Obviously, one motivation
for Girder is to allow people not to deal with channels sometimes.
The actual logging comes from the marvelous
timbre library, and the
trace
messages are generated by sandwiching the chan
between async/map<
and async/map>
, which intercept the
content on the way in and out, but otherwise ignore it.
Note that lchans
, where named channels get stashed for
future examination is an atom containing a
[weak](weak references
hash map. That means channels entries will tend to disappear quickly after
all other references to the contained channels are gone.
Aside:
Unfortunately, core.cache/weak-cache-factory
didn't actually
exist, so I had to add it in my own
fork.
was an adventure in its own right, since I had to
supplant the version
of core.cache
that core.async
was pulling in.
Anyway.
The main crpop
action is in (wcar (:redis this)
(car/brpoplpush qkey bkey 60))
. To communicate with Redis, we use
the carmine library
(coincidentally by the same author as timbre). carmine
is very
cool. In addition to exposing every* standard
Redis command as a
car/standard-redis-command
, it handles connection pooling and
serialization of values, so you don't have to pretend everything's a
string.
The (:redis this)
extracts connection details from the
Redis-Backend
record; then (car/brpoplpush qkey bkey 60)
blocks, trying to pop an element from right end qkey
, which it
will do atomically with pushing it onto left end of bkey
. If we
get something, we push it onto the out
channel and block again.
If we don't, we make sure that out
is still open (i.e. someone
cares), and block for another 60 seconds. The synchronous, blocking
nature of the Redis call is hidden from the consumer of out
.
There were other options here. First, I could have specified an infinite timeout, which generally wouldn't change functionality, but would have made it harder to shut down operations if necessary. Second, I could have used Redis pub/sub capability to avoid explicit blocking, but pub/sub with exactly one consumer felt like overkill, and the blocking is only moderately evil in a scheme of things where (1) Redis itself is fully asynchronous internally, (2) the pub/sub alternative still requires something to block, wait waiting for messages.
There's a corresponding clpush
, which exposes pushing onto a
Redis queue as writing to an async
channel.
Listening for finished calculations
I haven't actually explained how you ask for a calculation, or how it get done, but it will be helpful first to understand how we distribute the results.
Here we do actually make use of Redis pub/sub, to provide
notification when a calculation has completed. As multiple running
requests may request the same calculation, having multiple listeners
is an important feature, but since ongoing notification of changes
to the invariant return value of a pure function is by definition
useless, it would be wasteful to build up and tear down a channel
corresponding to every request id. Accordingly, there's only one
actual Redis pub/sub channel called "CALCS",
but we locally keep an atom subs
, holding a map of request ids to a
collection of channels to be notified whenever an appropriate messages
comes in:
(defrecord Redis-KV-Listener [subs listener])
(defn- kv-message-cb [a [etype _ val :as msg]]
(when (and (= etype "message") (vector? val))
(let [[k v] val]
(swap! a (fn [cmap]
(doseq [c (keys (get cmap k))] (go (>! c v) (close! c)))
(dissoc cmap k))))))
(defn- kv-listener [redis "CALCS"]
(let [subs (atom {}) ;; {reqid {c1 TBD, c2 TBD}}
redis-listener (car/with-new-pubsub-listener
(:spec redis)
{"CALCS" (partial kv-message-cb subs)}
(car/subscribe "CALCS"))]
(->Redis-KV-Listener subs redis-listener)))
The (car/with-new-pubsub-listener ...)
call creates a thread in
carmine
, which listens on a socket for messages on the topic "CALCS" and
calls my kv-message-cb
whenever it receives one.
Messages come back from Redis as tuples of
[event-type channel-name value]
, where
value
for us will be a further tuple, [reqid, result]
.
The callback just forwards results to all async
channels
registered for the reqid
in the subs
map.
A corresponding
method in Redis-Backend
is responsible for registering interest
in the results of a particular reqid into the subs
atom of a
Redis-KV-Listener
:
(kv-listen [this k]
(let [{{a :subs} :kvl} this
c (lchan (str "kv-listen" k) (async/dropping-buffer 1)]
(swap! a (fn [cmap] (assoc-in cmap [k c] 1)))
c))
Note
- fancy destructuring here:
{{a :subs} :kvl} this
first extracts the:kvl
element of theRedis-Backend
record, from which it then extracts the:subs
record of theRedis-KV-Listener
. - Yes, the collection of channels could have been a set, but I wanted to use
assoc-in
'cause it's prettier, and I have a premonition that someday, the values in the map will be useful for something. - Use of
dropping-buffer
is explained about 10 cm below.
Publication is relatively normal, Redis-wise, but note that we take care of informing any local listeners ASAP, without making them wait for the message to pass through Redis:
(kv-publish [this k v]
(go (doseq [c (keys (get @a k))] (>! c v) (close! c))
(swap! a dissoc k))
(wcar redis (:redis this) (car/publish "CALCS" [k v])))
Eventually, kv-message-cb
will receive these same messages after they pass through
Redis, but this race condition is harmless, as the loser will just be whistling into /dev/null
.
The one improbable disaster that might have occurred is that both the kv-message-cb
and
kv-publish
might have interleaved their >!
s and close!
s, but the
dropping-buffer
makes sure that this couldn't result in a blocking condition.
Requesting a calculation
Girder provides an enqueue
function, which, ignoring some straightforward
business with a local cache (which, by the way, uses SoftReference
s, so it will hang to its contents until we get
close to the heap limit)
and a bit of instrumentation,
doesn't do much more than call Girder-Backend
's enqueue-listen
, which
will hand back a channel that eventually delivers the wrapped calculation result:
(def local-cache (atom (cache/soft-cache-factory {})))
(defn enqueue [nodeid req & [deb]]
(let [id (iid deb "ENQ")
res (get @local-cache (->reqid req))]
(if res
(let [c (chan)]
(debug "enqueue" id " found cached value for" req)
(go (>! c res))
c)
(enqueue-listen @back-end
nodeid (->reqid req)
:requests :state
(fn not-started? [v] (nil? v))
(fn done? [ [s _] ] (= s :done))
(fn extract [ [_ v] ] v)
id))))
The enqueue-listen
method makes use of a global state values stored in redis under (str "state-val-" reqid)
and comprising a vector [STATE VALUE]
where STATE
is nil
, :running
or :done
, and
will always progress in that order. If the STATE
is :done
, VALUE
will be the cached result.
The three function arguments deal with detecting whether a request is brand-new or finished, and extracting
the result. This is a case where the optional naming of $\lambda$ functions makes the code a lot easier to understand.
This operation of enqueue-listen
is relatively complicated,
because it's the one place in the code that explicitly
handles a potential race condition:
a calculation starting between the time we get the request and the time
we go about handling it. Really, this is just optimization that makes redundant calculations
less likely, since, our our lovely functional world, they would at worst be wasteful, but
in practice it's not uncommon for a raft of simultaneously issued requests go on to request
exactly the same thing, also simultaneously, so the optimization does get used.
(enqueue-listen
[this
nodeid reqid
queue-type val-type
enqueue-pred done-pred done-extract debug-info]
(let [redis (assoc (:redis this) :reqid reqid :nodeid nodeid) ; :single-conn true
qkey (queue-key nodeid queue-type)
vkey (val-key reqid val-type)
c (kv-listen this reqid debug-info)]
(wcar redis
(let [v (second (protocol/with-replies* ; wcar redis ;
(car/watch vkey)
(car/get vkey)))
_ (trace "enqueue-listen at" nodeid "found state of" reqid "=" v c)]
(cond
(done-pred v) (let [v (done-extract v)]
(trace "enqueue-listen" reqid "already done, publishing" v)
(go (>! c v) (close! c)))
(enqueue-pred v) (let [r (protocol/with-replies* ; wcar redis
;; x'n will fail if vkey has been messed with.
(car/multi)
(car/lpush qkey reqid)
(car/exec))]
(trace "enqueue-listen enqueueing" reqid r))
:else (trace "enqueue-listen" reqid "state already" v))
(car/unwatch)))
c))
This first thing that happens is we kv-listen
as explained above
for information about this reqid
,
so no matter what happens, we'll eventually find out the result.
Everything else occurs within the carmine wcar
macro. Within
this scope, anything Redis-related will use the same connection, which
is useful because the first thing that happens is the pipelined pair
of Redis commands (car/watch vkey) (car/get vkey)
.
The watch
will cause any subsequent transaction
(defined in Redis as commands sandwiched between multi
and exec
) to
fail. This "optimistic" approach can be a more efficient way of handling concurrency than
true locking. Here, we are getting the state and then watch
for changes in it.
If we find the state to be :running
, then we don't have to do anything. Someone else is calculating
it somewhere, and we'll pick up the result, as we're already kv-listen
ing for it.
If the state is :done
, then it
isn't going to change any further. Most likely, it became :done
prior to our
kv-listen
, so we inform the output channel directly. On the off chance that it became
:done
after our kv-listen
, then it will be published redundantly.
If the state is nil
, then we're probably encountering the reqid
for the first time
in recorded history, so
we lpush
it onto the Redis queue; however, since push occurs within a Redis transaction, it will
fail if any watch
ed value has changed, i.e. if the request has started
:running
somewhere else or has passed through that stage and is now :done
. In either case, the
kv-listen
will take care of the result. We can let the transaction fail silently, with the same
effect as if we had determined that the request was :running
.
Acting on requests for calculations
Work gets done by worker nodes, which listen on a queue, plucking off work to do.
Worker nodes are launched with the function below, and async/go-loop
until told to stop.
As this is a function, rather than a protocol method, it has no access to the Girder-Backend
object, which we have therefore stashed in a local atom
called back-end
. I'll
walk through this code below, but for now just note that no assumptions are made about Redis
being involved.
(defn unclaimed [reqid] (nil? (get-val @back-end reqid :state)))
(defn launch-worker
[nodeid poolid]
(add-member @back-end poolid :volunteers nodeid)
(let [ctl (lchan (str "launch-worker " nodeid))
allreqs (crpop @back-end nodeid :requests)
reqs (async/filter< unclaimed allreqs)]
(debug "worker" nodeid "starting")
(async/go-loop [volunteering false]
(trace "worker" nodeid "volunteering state=" volunteering)
(let [[reqid ch] (if volunteering
(async/alts! [reqs ctl])
(async/alts! [reqs ctl] :default :empty))]
(debug "Worker" nodeid "received" reqid ch)
(cond
;; Apparently out of work. Time to volunteer
(= reqid :empty) (do
(debug "worker" nodeid "is bored and volunteering with" poolid)
(clear-bak @back-end [nodeid :requests])
(lpush-and-set @back-end
poolid :volunteers nodeid
nodeid :busy nil)
(recur true))
;; Control channel wants us to close.
(= ch ctl) (do (debug "Closing worker" nodeid)
(remove-member @back-end poolid :volunteers nodeid)
(close-all! reqs allreqs ctl))
;; Looks like real work:
:else (when reqid
(debug "Worker" nodeid " nodeid will now process" reqid)
(set-val @back-end nodeid :busy true)
(<! (process-reqid nodeid reqs reqid))
(recur false)))))
ctl))
In some JVM, somewhere, we'll call (launch-worker "workername" "poolname")
, where the second argument
refers to a distributor node, which (as explained in the previous post) will probably be the source
of requests on the worker's queue.
The worker uses crpop
to get requests from its queue, filter<
ing them through unclaimed
,
which checks the global state to see if someone else might by :running
this task already. Continuing
a theme that has hopefully become boring by now, it doesn't matter if we fail to detect that the task
is already in progress. At worst, we'll do it twice.
The main go-loop uses async/alts!
to look for work, or for a signal on ctl
telling it
to shut down. (get-val
and set-val
are boring protocol methods, which in Redis-Backend
do a GET
and SET
of scalar values).
Workers exist in two states. Either they are "volunteering" (as
described in the previous post) in which case they expect eventually
to get work on their request queue, or they are busy, which means that,
as far as they know, there may still be work on their own sub-requests. If we're not
volunteering, then alts!
call sets a :default
, so an empty queue
causes an immediate return value of :empty
, indicating that it is time to push
our nodeid
onto the volunteer queue.
The clear-bak
method of the Redis-Backend
empties out the backup queue that we populated
earlier with brpoplpush
, because we know we've handled everything we're supposed to.
actually doing work
We finally get to evaluating the functions in process-reqid
:
(defn- process-reqid [nodeid reqchan req-or-id & [deb]]
(let [id (iid deb "PRQ")
[req reqid] (req+id req-or-id)
c (lchan #(str "process-reqid" id nodeid reqid))
res (get @local-cache reqid)]
(go
(if res
(do
(debug "process-reqid found cached value" id nodeid reqid)
(>! c [:cached res])
(kv-publish @back-end reqid res))
(let [ [state val] (get-val @back-end reqid :state)]
(condp = state
:running (do
(debug "process-reqid" id nodeid reqid "already running" val)
(>! c [:running val]))
:done (do (debug "process-reqid" id nodeid reqid "already done" val)
(kv-publish @back-end reqid val)
(>! c [:done val]))
nil (let [state1 (set-val @back-end reqid :state [:running nodeid])
_ (debug "process-reqid" id nodeid reqid state1 "->" :running)
[f & args] req
cres (binding [*nodeid* nodeid
*reqchan* reqchan
*current-reqid* (->reqid reqid)]
(apply f args))
res (<! cres)
state2 (set-val @back-end reqid :state [:done res])]
(>! c [:calcd res])
(swap! local-cache assoc reqid res)
(debug "process-reqid" id nodeid reqid state1 "->" state2 "->" :done res)
(kv-publish @back-end reqid res))))))
c))
As elsewhere, we re-check the local cache and the global state, in a hail-Mary pass to avoid doing actual work.
Under the horrible circumstance where we actually have to do something:
- Call a protocol method to mark the state as
:running
and the location where it is doing so, in case someone is curious. (The existence of myriad tools for inspecting Redis rewards leaving breadcrumbs like this when it comes time to debug.) - Destructure the function and its arguments from the request. Remember that the function is
:girded
, so it will wrap up the:value
or:error
as the case may be and send it to a channel that it returns immediately. - Locally bind our current
nodeid
and thereqchan
from which we're pulling requests, in case the function ends up making re-entrant calls. - Locally bind the
current-reqid
to assistformat-exception
in generating an error stack if necessary. apply
the function to the request arguments and wait.- Mark that we're done, and store the result.
- Publish the result.
Distributing requests
Typically, one doesn't request work directly from a worker, but from a distributor that has a pool of workers at its disposal.
(defn not-busy [nodeid] (nil? (get-val @back-end nodeid :busy)))
(defn launch-distributor
"Listen for requests and volunteers.
When we find one of each, add request to volunteer's queue."
[nodeid & [poolid]]
(let [ctl (lchan (str "launch-distributor " nodeid))
allreqs (crpop @back-end nodeid :requests)
reqs (async/filter< unclaimed allreqs)
allvols (crpop @back-end nodeid :volunteers)
vols (async/filter< not-busy allvols)
reqs+vols (async/map vector [reqs vols])]
(when poolid (add-member @back-end poolid :volunteers nodeid))
(async/go-loop [volunteering false]
(trace "distributor" nodeid "waiting for requests and volunteers")
(let [ [v c] (if volunteering
(async/alts! [reqs+vols ctl])
(async/alts! [reqs+vols ctl] :default :empty))]
(debug "distributor" nodeid "got" v c)
(cond
(= c ctl) (do
(un-register nodeid poolid)
(close-all! reqs allreqs vols reqs+vols ctl))
(= v :empty) (do (when poolid
(debug "distributor" nodeid "is bored and volunteering with" poolid)
(lpush-and-set @back-end
poolid :volunteers nodeid
nodeid :busy nil))
(trace "distributor" nodeid "recurring")
(recur true))
:else (let [ [reqid volid] v]
(debug "distributor" nodeid "pushing" reqid "to request queue for" volid)
(when poolid (set-val @back-end nodeid :busy true))
(lpush @back-end volid :requests reqid)
(clear-bak @back-end [nodeid :volunteers nodeid :requests])
(recur false)))))
ctl))
We'll launch one of these with (launch-distributor "poolname")
. Since distributors can distribute to other distributors, there can
also be a second argument, for the pool that this distributor belongs to.
The main excitement here is that we're listening simultaneously on multiple channels in a complicated way.
The request channel has already-claimed requests filtered out using async/filter<
; stale volunteers
volunteers are weeded out in a similar fashion. Finally, vetted volunteer and request pairs are
async/map vector
ed together into a single channel that disgorges [request volunteer]
tuples:
allreqs (crpop @back-end nodeid :requests) ;; channel to receive all requests
reqs (async/filter< unclaimed allreqs) ;; filter out the ones that nobody is working on yet
allvols (crpop @back-end nodeid :volunteers) ;; channel to receive all volunteers
vols (async/filter< not-busy allvols) ;; filter out volunteers who became busy after volunteering
reqs+vols (async/map vector [reqs vols])] ;; bundle together request and volunteer tuples
Other than the fact that it receives these tuples rather than plain requests, the async/alt!
logic is about
the same as it was in the worker. When we get a live tuple, we simply push the reqid
onto the volid
queue and move-along.
Re-entrant requests
As in the examples, we might encounter requests
within a cdefn
function.
In these cases, the macro will have been called without a nodeid argument, but we have one from the thread-local
binding, so, requests
expands eventually into a call to
(enqueue-reentrant reqs *nodeid* *reqchan*)))
which was shown last week, but should make more sense now:
(defn enqueue-reentrant [reqs nodeid reqchan]
(let [out (chan)]
(go
(let [results (async/map vector (map #(enqueue nodeid %) reqs))]
(async/go-loop []
(let [ [v c] (async/alts! [results reqchan])]
(if (= c results)
(do (>! out v) (close! c) (close! out)
(do (<! (process-reqid nodeid reqchan v))
(recur))))))))
out))
As before, it dumps the requests onto the local queue, and then begins pulling from that same queue.
The crucial thing to note is that process-reqid
is called both here and by launch-worker
,
the difference being that launch-worker
will only get work that it's volunteered for, while, here,
we already know that there's work on the queue, because we put it there.
Sharing the load
As illustrated so far, a worker might end up with a very long queue indeed due to nested re-entrant requests . This is supposed to be a distributed system, so we want some way to take share the load. The helper process
(defn launch-helper [nodeid cycle-msec]
(let [ctl (lchan (str "launch-helper nodeid"))]
(debug "worker" nodeid "starting")
(async/go-loop []
(let [member-nodeids (get-members @back-end nodeid :volunteers)
in-our-queue (set (qall @back-end nodeid :requests))
in-member-queues (apply clojure.set/union
(map #(set (qall @back-end % :requests)) member-nodeids))
additions (clojure.set/difference in-member-queues in-our-queue)]
(when (seq additions)
(debug "Helper" nodeid "lifting requests" additions)
(lpush-many @back-end nodeid :requests (vec additions))))
(if (closed? ctl)
(debug "Closing helper" nodeid)
(do
(trace "helper" nodeid "waiting" cycle-msec)
(<! (timeout cycle-msec))
(recur))))
ctl))
wakes up on a configurable cycle, looks at the queues of all nodes in its pool and simply copies anything it finds there onto the end of the distributor's queue, whence, as described just above, it will dole out requests to any workers volunteering. So, basically, we're setting up a race where the original worker has a bit of a head start. Again, the worst case is that the same calculation gets done twice, and the main extra cost is in checking to see if a request is in process before acting on it.
Should the distributor belong to another pool up the hierarchy, then its queue will similarly be copied up.
The main idea here is that we generally try to do calculations locally, because
- there's some latency involved in distributing them (and we have logic to inform local listeners on an express path, so results don't need to flow through Redis),
- the next step in this project will use caching strategies such that the values Redis holds are only unique ids.
The system should be tuned such that distribution occurs roughly at the point where things are taking about as much time as the latency that distribution will introduce.
Phew!
Next time, I'll talk a little bit about testing this on 100 AWS nodes. It should be a lot shorter than this.
Comments
comments powered by Disqus