Update 2015-01-12

The algorithm as it exists in HEAD is somewhat different from the below, in ways that I'll describe (eventually) in an another post. In some ways, it's closer to Fork-Join, but with important differences to support reentrancy, share results of duplicate requests and adjust for the costs of distribution.

Recap of recap

In a previous post, I introduced a framework called Girder (the code is on github), which aims to facilitate Plain Old Functional Programming on distributed systems. By POFP, I mean code that, as much as possible, consists of normal looking calls to normal looking functions, the only requirement for which is referential transparency: there are no side effects, and calls to the same function with the same arguments should always return the same value.

Functional programming, in my view, is dramatically less functional when you use a framework that requires you to cast your algorithm in terms of message passing, whether in the form of key-value pairs for map/reduce or explicitly as mailbox deliveries in the actor model. What makes me particularly uncomfortable is an emerging conventional wisdom that one should write in these styles even when the program will run comfortably on one machine, because someday you will need to scale out, and you might as well get the pain out of the way now.

To over-simplify more than a bit, I want to distinguish between algorithms that are innately about large scale data aggregation (in the case of map-reduce) or dynamic response to signals (in the case of the actor model), and those that are most elegantly expressed in terms of functions, but might benefit from running on more than one core. Put in reverse, if the problem is innately about data aggregation or dynamic response to signals, then maybe you should use one of these message-based models, even if you know in advance that scale will never be important.

In this post, I'll explain the core machinery behind Girder, showing how requests get dispatched, distributed and executed.

Recap of examples

I don't want to repeat too much, but here are a couple of the examples from earlier. The first illustrates a sort of calculation that would obviously benefit from distributed, parallel execution.

(cdefn calc-price [details] (calculate-something-expensive deets))
(cdefn calc-portfolio [spec]
   (let [reqs [ [f (munge spec)] [f (diddle spec)]]]
      (reduce + (requests reqs))))
(request "pool" (calc-portfolio "xyz"))

Of note here is that the calculation requests look almost exactly like function calls. (With a little more macro work, I think I can get rid of the "almost.")

The second example illustrates code that might generate an error that we ought to be able to report on, irrespective of where on the grid the code executes:

    (cdefn divide [x y] (float (/ x y)))
    (cdefn ratio [i] (request (divide i (dec i))))

And, as shown last time, an the exception thrown when requesting (ratio 1) contains meaningful information where on the "distributed stack" the error occurred.

Serialized requests

Requests, remember, are finite sequences that start with a function and all of whose successive members are EDN serializable. The whole request gets turned into a string with

  (defn ->reqid [ [f & args] ]
      (pr-str (concat [(fname f)] args)))

where fname is so ghastly I sort of wince pasting it here, but it's inspired by an only slightly less gruesome post on the Clojure mailing list, so it must be OK, right?

(def STR->OPT
  (apply hash-map (mapcat #(vector (second %) (str (first %))) clojure.lang.Compiler/CHAR_MAP)))
(def OPTPAT
  (re-pattern (str "\\b" (clojure.string/join "|" (vals clojure.lang.Compiler/CHAR_MAP)) "\\b" )))
(defn fname
  "Extract the qualified name of a clojure function as a string."
  [f]
  (-> (str f)
      (clojure.string/replace OPTPAT STR->OPT)
      (clojure.string/replace-first "$" "/")
      (clojure.string/replace #"@\w+$" "")
      (clojure.string/replace #"_" "-")))

Al this regexing is responsible for turning a string like "#core\(_PLUS_ clojure.core\)_PLUS_@4955aabe" into a "clojure.core/+", which can be resolved in the reverse operation,

(defn reqid->req [reqid]
  (let [[f & args]  (read-string reqid)
        f           (resolve (symbol f))
        f           (if (:girded (meta f)) f (cfn f))]
    (apply vector f args)))

As you see, Clojure complicates the task by maintaining its own mangling scheme for non-alphanumeric operators, and I'm violating some sort of ethical code by extracting it from src/jvm/clojure/lang/Compiler.java.

Remember from the last post that the macrocdefn for defining functions that return results via an async channel, wrapped up in {:value ...} or {:error ...}. That macro also set metadata to indicate that the function was thus :girded. If, as in the case of +, it's not girded, the easily imaginable cfn function takes care of doing so. This way, we can distribute requests using boring old functions as well as fancy functions like calc-portfolio that themselves also make request.

Once again, it should be emphasized that if your.namespace.here/gorgonzola cannot be resolved, or resolves to something different in different running instances, then terrible, terrible things will happen.

Redis and core.async

The central state keeper behind all the remote calculations is Redis. In theory, it could be something else, but the only implementation of Girder-Backend I've so far written is for Redis-Backend.

Redis is an in-memory database with many advantages. For our purposes, the greatest two are

  1. extraordinary speed (generally hundreds of thousands of operations, loosely defined, per second)
  2. certain assurances of atomicity.

Such assurances are contingent on proper functioning of the Redis process and the platform on which it runs, and, while Redis does generates periodic snapshots, from which it can be recovered, there are many plausible failure scenarios where you'll lose all your Redis data forever.

But, it is reasonable to ask, who cares? Obviously there are cases you might, but there are also many cases where you shouldn't. The primary assumption that we'll be writing purely functional code implies that the very worst consequence of data loss is having to run our program again. Again, while that might not always be acceptable, it very often is.

I'll discuss various implementations of methods in the Redis-Backend implementation of the Girder-Backened protocol below. Know for now that (defrecord Redis-Backend [redis kvl]...) holds some information about the connection to Redis and a reference to what I'll call a key-value listener that's used for publishing things internally.

In a post scheduled for a few months from now, I'll discuss integrating Girder with Datomic, with further considerations about data locality, but, for now, let's just assume that input data is provided "somehow."

Queues

To ease working with Redis, and to provide an abstraction layer that will someday allow different back-ends, we make heavy use of core.async internally. (The user of Girder won't see it.) Here's an implementation of a typical Girder-Backend method in Redis-Backend. This creates a channel, reading from which will actually be right-popping a Redis queue:

  (crpop [this qkey queue-type]
    (let [qkey   (queue-key key queue-type)
          bkey   (queue-bak-key key queue-type)
          out    (lchan (str "crpop-" key))]
      (async/go-loop []
        (let [val (wcar (:redis this) (car/brpoplpush qkey bkey 60))]
          (trace "crpop" key "got" val "from redis list" qkey)
          (if (still-open? out)
            (do (when val(>! out val))
                (recur))
            (debug "crpop" key queue-type "shutting down"))))
      out))

In a design decision that I know some people object to, the out channel does double duty as a control channel. (You'll notice that still-open? isn't part of the officially exposed core.async interface.)

Taking crpop from the start:

The private functions xxx-key just create strings that will be used to name Redis lists. E.g. one that came up in last post's examples ended up getting called "requests-queue-pool". Longer strings might get md5'd into something briefer, to save wire wear, but that's an implementation detail.

The reason for defining both qkey and bkey is that we'll be using the Redis RPOPPLPUSH command to atomically stash what we pop off of one queue in a backup queue, which can be used for recovery purpose if the the process running crpop dies after popping from Redis but before whatever it was popped was read from its channel. This is the recommended implementation of reliable queues in Redis. For different back-ends, the second key might be ignored.

The out channel is defined with lchan, which was written in desperation to name async channels and keep track of what's going in and out of them:

(def lchans (atom (cache/weak-cache-factory {})))
(defn lchan [name & [buf]]
  (if-not (timbre/level-sufficient? :trace nil) (chan buf)  ;; short-circuit if not at :trace level
          (let [name (if (fn? name) (name) name)
                c    (async/map> #(do (trace "Channel" name "receiving" %) %)
                                 (async/map< #(do (trace "Channel" name "delivering" %) %) (chan buf)))]
            (trace "Channel" name "created:" c)
            (swap! lchans #(assoc % c name))
            c)))

At :trace level, all writes to and reads from the channel will be logged.

I found this extraordinarily useful during development, since I also find it extraordinarily easy to get confused in the maze of twisty core.async channels it's so easy to create. Obviously, one motivation for Girder is to allow people not to deal with channels sometimes.

The actual logging comes from the marvelous timbre library, and the trace messages are generated by sandwiching the chan between async/map< and async/map>, which intercept the content on the way in and out, but otherwise ignore it.

Note that lchans, where named channels get stashed for future examination is an atom containing a [weak](weak references hash map. That means channels entries will tend to disappear quickly after all other references to the contained channels are gone.

Aside: Unfortunately, core.cache/weak-cache-factory didn't actually exist, so I had to add it in my own fork. was an adventure in its own right, since I had to supplant the version of core.cache that core.async was pulling in.

Anyway.

The main crpop action is in (wcar (:redis this) (car/brpoplpush qkey bkey 60)). To communicate with Redis, we use the carmine library (coincidentally by the same author as timbre). carmine is very cool. In addition to exposing every* standard Redis command as a car/standard-redis-command, it handles connection pooling and serialization of values, so you don't have to pretend everything's a string.

The (:redis this) extracts connection details from the Redis-Backend record; then (car/brpoplpush qkey bkey 60) blocks, trying to pop an element from right end qkey, which it will do atomically with pushing it onto left end of bkey. If we get something, we push it onto the out channel and block again. If we don't, we make sure that out is still open (i.e. someone cares), and block for another 60 seconds. The synchronous, blocking nature of the Redis call is hidden from the consumer of out.

There were other options here. First, I could have specified an infinite timeout, which generally wouldn't change functionality, but would have made it harder to shut down operations if necessary. Second, I could have used Redis pub/sub capability to avoid explicit blocking, but pub/sub with exactly one consumer felt like overkill, and the blocking is only moderately evil in a scheme of things where (1) Redis itself is fully asynchronous internally, (2) the pub/sub alternative still requires something to block, wait waiting for messages.

There's a corresponding clpush, which exposes pushing onto a Redis queue as writing to an async channel.

Listening for finished calculations

I haven't actually explained how you ask for a calculation, or how it get done, but it will be helpful first to understand how we distribute the results.

Here we do actually make use of Redis pub/sub, to provide notification when a calculation has completed. As multiple running requests may request the same calculation, having multiple listeners is an important feature, but since ongoing notification of changes to the invariant return value of a pure function is by definition useless, it would be wasteful to build up and tear down a channel corresponding to every request id. Accordingly, there's only one actual Redis pub/sub channel called "CALCS", but we locally keep an atom subs, holding a map of request ids to a collection of channels to be notified whenever an appropriate messages comes in:

(defrecord Redis-KV-Listener [subs listener])
(defn- kv-message-cb [a [etype _ val :as msg]]
   (when (and (= etype "message") (vector? val))
         (let [[k v] val]
           (swap! a (fn [cmap]
              (doseq [c (keys (get cmap k))] (go (>! c v) (close! c)))
           (dissoc cmap k))))))
(defn- kv-listener [redis "CALCS"]
  (let [subs               (atom {}) ;; {reqid {c1 TBD, c2 TBD}}
        redis-listener     (car/with-new-pubsub-listener
                                 (:spec redis)
                                 {"CALCS" (partial kv-message-cb subs)}
                                 (car/subscribe "CALCS"))]
        (->Redis-KV-Listener subs redis-listener)))

The (car/with-new-pubsub-listener ...) call creates a thread in carmine, which listens on a socket for messages on the topic "CALCS" and calls my kv-message-cb whenever it receives one.

Messages come back from Redis as tuples of [event-type channel-name value], where value for us will be a further tuple, [reqid, result]. The callback just forwards results to all async channels registered for the reqid in the subs map.

A corresponding method in Redis-Backend is responsible for registering interest in the results of a particular reqid into the subs atom of a Redis-KV-Listener:

  (kv-listen [this k]
    (let [{{a :subs} :kvl}  this
            c               (lchan (str "kv-listen" k) (async/dropping-buffer 1)]
      (swap! a (fn [cmap] (assoc-in cmap [k c] 1)))
      c))

Note

  1. fancy destructuring here: {{a :subs} :kvl} this first extracts the :kvl element of the Redis-Backend record, from which it then extracts the :subs record of the Redis-KV-Listener.
  2. Yes, the collection of channels could have been a set, but I wanted to use assoc-in 'cause it's prettier, and I have a premonition that someday, the values in the map will be useful for something.
  3. Use of dropping-buffer is explained about 10 cm below.

Publication is relatively normal, Redis-wise, but note that we take care of informing any local listeners ASAP, without making them wait for the message to pass through Redis:

  (kv-publish [this k v]
    (go (doseq [c (keys (get @a k))] (>! c v) (close! c))
        (swap! a dissoc k))
    (wcar redis (:redis this) (car/publish "CALCS" [k v])))

Eventually, kv-message-cb will receive these same messages after they pass through Redis, but this race condition is harmless, as the loser will just be whistling into /dev/null. The one improbable disaster that might have occurred is that both the kv-message-cb and kv-publish might have interleaved their >!s and close!s, but the dropping-buffer makes sure that this couldn't result in a blocking condition.

Requesting a calculation

Girder provides an enqueue function, which, ignoring some straightforward business with a local cache (which, by the way, uses SoftReferences, so it will hang to its contents until we get close to the heap limit) and a bit of instrumentation, doesn't do much more than call Girder-Backend's enqueue-listen, which will hand back a channel that eventually delivers the wrapped calculation result:

 (def local-cache (atom (cache/soft-cache-factory {})))
 (defn enqueue [nodeid req & [deb]]
  (let [id  (iid deb "ENQ")
        res (get @local-cache (->reqid req))]
    (if res
      (let [c   (chan)]
        (debug "enqueue" id " found cached value for" req)
        (go (>! c res))
        c)
      (enqueue-listen @back-end
                      nodeid (->reqid req)
                      :requests :state
                      (fn not-started? [v] (nil? v))
                      (fn done?        [ [s _] ] (= s :done))
                      (fn extract      [ [_ v] ] v)
                      id))))

The enqueue-listen method makes use of a global state values stored in redis under (str "state-val-" reqid) and comprising a vector [STATE VALUE] where STATE is nil, :running or :done, and will always progress in that order. If the STATE is :done, VALUE will be the cached result. The three function arguments deal with detecting whether a request is brand-new or finished, and extracting the result. This is a case where the optional naming of \(\lambda\) functions makes the code a lot easier to understand.

This operation of enqueue-listen is relatively complicated, because it's the one place in the code that explicitly handles a potential race condition: a calculation starting between the time we get the request and the time we go about handling it. Really, this is just optimization that makes redundant calculations less likely, since, our our lovely functional world, they would at worst be wasteful, but in practice it's not uncommon for a raft of simultaneously issued requests go on to request exactly the same thing, also simultaneously, so the optimization does get used.

(enqueue-listen
    [this
     nodeid reqid
     queue-type val-type
     enqueue-pred done-pred done-extract debug-info]
  (let [redis (assoc (:redis this) :reqid reqid :nodeid nodeid)  ;  :single-conn true
      qkey (queue-key nodeid queue-type)
      vkey (val-key reqid val-type)
      c    (kv-listen this reqid debug-info)]
      (wcar redis
       (let [v  (second (protocol/with-replies* ; wcar redis  ;
                          (car/watch vkey)
                          (car/get vkey)))
             _     (trace "enqueue-listen at" nodeid "found state of" reqid "=" v c)]
         (cond
          (done-pred v)  (let [v (done-extract v)]
                           (trace "enqueue-listen" reqid "already done, publishing" v)
                           (go (>! c v) (close! c)))
          (enqueue-pred v) (let [r (protocol/with-replies* ; wcar redis
                                         ;;  x'n will fail if vkey has been messed with.
                                         (car/multi)
                                         (car/lpush qkey reqid)
                                         (car/exec))]
                             (trace "enqueue-listen enqueueing" reqid r))
          :else           (trace "enqueue-listen" reqid "state already" v))
         (car/unwatch)))
      c))

This first thing that happens is we kv-listen as explained above for information about this reqid, so no matter what happens, we'll eventually find out the result.

Everything else occurs within the carmine wcar macro. Within this scope, anything Redis-related will use the same connection, which is useful because the first thing that happens is the pipelined pair of Redis commands (car/watch vkey) (car/get vkey). The watch will cause any subsequent transaction (defined in Redis as commands sandwiched between multi and exec) to fail. This "optimistic" approach can be a more efficient way of handling concurrency than true locking. Here, we are getting the state and then watch for changes in it.

If we find the state to be :running, then we don't have to do anything. Someone else is calculating it somewhere, and we'll pick up the result, as we're already kv-listening for it.

If the state is :done, then it isn't going to change any further. Most likely, it became :done prior to our kv-listen, so we inform the output channel directly. On the off chance that it became :done after our kv-listen, then it will be published redundantly.

If the state is nil, then we're probably encountering the reqid for the first time in recorded history, so we lpush it onto the Redis queue; however, since push occurs within a Redis transaction, it will fail if any watched value has changed, i.e. if the request has started :running somewhere else or has passed through that stage and is now :done. In either case, the kv-listen will take care of the result. We can let the transaction fail silently, with the same effect as if we had determined that the request was :running.

Acting on requests for calculations

Work gets done by worker nodes, which listen on a queue, plucking off work to do. Worker nodes are launched with the function below, and async/go-loop until told to stop. As this is a function, rather than a protocol method, it has no access to the Girder-Backend object, which we have therefore stashed in a local atom called back-end. I'll walk through this code below, but for now just note that no assumptions are made about Redis being involved.

(defn unclaimed [reqid] (nil? (get-val  @back-end reqid :state)))
(defn launch-worker
  [nodeid poolid]
  (add-member @back-end poolid :volunteers nodeid)
  (let [ctl       (lchan (str "launch-worker " nodeid))
        allreqs   (crpop @back-end nodeid :requests)
        reqs      (async/filter< unclaimed allreqs)]
    (debug "worker" nodeid "starting")
    (async/go-loop [volunteering false]
      (trace "worker" nodeid "volunteering state=" volunteering)
      (let [[reqid ch]   (if volunteering
                         (async/alts! [reqs ctl])
                         (async/alts! [reqs ctl] :default :empty))]
        (debug "Worker" nodeid "received" reqid ch)
        (cond
         ;; Apparently out of work.  Time to volunteer
         (= reqid :empty) (do
                          (debug "worker" nodeid "is bored and volunteering with" poolid)
                          (clear-bak @back-end [nodeid :requests])
                          (lpush-and-set @back-end
                                         poolid :volunteers nodeid
                                         nodeid :busy nil)
                          (recur true))
         ;; Control channel wants us to close.
         (= ch ctl)  (do (debug "Closing worker" nodeid)
                         (remove-member @back-end poolid :volunteers nodeid)
                         (close-all! reqs allreqs ctl))
         ;; Looks like real work:                        
         :else        (when reqid
                        (debug "Worker" nodeid " nodeid will now process" reqid)
                        (set-val @back-end nodeid :busy true)
                        (<! (process-reqid nodeid reqs reqid))
                        (recur false)))))
    ctl))

In some JVM, somewhere, we'll call (launch-worker "workername" "poolname"), where the second argument refers to a distributor node, which (as explained in the previous post) will probably be the source of requests on the worker's queue.

The worker uses crpop to get requests from its queue, filter<ing them through unclaimed, which checks the global state to see if someone else might by :running this task already. Continuing a theme that has hopefully become boring by now, it doesn't matter if we fail to detect that the task is already in progress. At worst, we'll do it twice.

The main go-loop uses async/alts! to look for work, or for a signal on ctl telling it to shut down. (get-val and set-val are boring protocol methods, which in Redis-Backend do a GET and SET of scalar values).

Workers exist in two states. Either they are "volunteering" (as described in the previous post) in which case they expect eventually to get work on their request queue, or they are busy, which means that, as far as they know, there may still be work on their own sub-requests. If we're not volunteering, then alts! call sets a :default, so an empty queue causes an immediate return value of :empty, indicating that it is time to push our nodeid onto the volunteer queue.

The clear-bak method of the Redis-Backend empties out the backup queue that we populated earlier with brpoplpush, because we know we've handled everything we're supposed to.

actually doing work

We finally get to evaluating the functions in process-reqid:

 (defn- process-reqid [nodeid reqchan req-or-id & [deb]]
   (let [id          (iid deb "PRQ")
         [req reqid] (req+id req-or-id)
         c           (lchan #(str "process-reqid" id nodeid reqid))
         res         (get @local-cache reqid)]
     (go 
       (if res
         (do  
           (debug "process-reqid found cached value" id nodeid reqid)
           (>! c [:cached res])
           (kv-publish @back-end reqid res))
         (let [ [state val] (get-val  @back-end reqid :state)]
           (condp = state
             :running (do 
                       (debug "process-reqid" id nodeid reqid "already running" val)
                       (>! c [:running val]))
             :done    (do (debug "process-reqid" id nodeid reqid "already done" val)
                         (kv-publish @back-end reqid val)
                         (>! c [:done val]))
             nil      (let [state1     (set-val @back-end reqid :state [:running nodeid])
                           _          (debug "process-reqid" id nodeid reqid state1 "->" :running)
                           [f & args] req
                           cres       (binding [*nodeid*        nodeid
                                                *reqchan*       reqchan
                                                *current-reqid* (->reqid reqid)]
                                        (apply f args))
                           res        (<! cres)
                           state2     (set-val @back-end reqid :state [:done res])]
                       (>! c [:calcd res])
                       (swap! local-cache assoc reqid res)
                       (debug "process-reqid" id nodeid reqid state1 "->" state2 "->" :done res)
                       (kv-publish @back-end reqid res))))))

     c))

As elsewhere, we re-check the local cache and the global state, in a hail-Mary pass to avoid doing actual work.

Under the horrible circumstance where we actually have to do something:

  1. Call a protocol method to mark the state as :running and the location where it is doing so, in case someone is curious. (The existence of myriad tools for inspecting Redis rewards leaving breadcrumbs like this when it comes time to debug.)
  2. Destructure the function and its arguments from the request. Remember that the function is :girded, so it will wrap up the :value or :error as the case may be and send it to a channel that it returns immediately.
  3. Locally bind our current nodeid and the reqchan from which we're pulling requests, in case the function ends up making re-entrant calls.
  4. Locally bind the current-reqid to assist format-exception in generating an error stack if necessary.
  5. apply the function to the request arguments and wait.
  6. Mark that we're done, and store the result.
  7. Publish the result.

Distributing requests

Typically, one doesn't request work directly from a worker, but from a distributor that has a pool of workers at its disposal.

(defn not-busy [nodeid] (nil? (get-val @back-end nodeid :busy)))
(defn launch-distributor
  "Listen for requests and volunteers.
   When we find one of each, add request to volunteer's queue."
  [nodeid & [poolid]]
  (let [ctl        (lchan (str  "launch-distributor " nodeid))
        allreqs    (crpop @back-end nodeid :requests)
        reqs       (async/filter< unclaimed allreqs)
        allvols    (crpop @back-end nodeid :volunteers)
        vols       (async/filter< not-busy allvols)
        reqs+vols  (async/map vector [reqs vols])]
    (when poolid  (add-member @back-end poolid :volunteers nodeid))
    (async/go-loop [volunteering false]
      (trace "distributor" nodeid "waiting for requests and volunteers")
      (let [ [v c]       (if volunteering
                          (async/alts! [reqs+vols ctl])
                          (async/alts! [reqs+vols ctl] :default :empty))]
        (debug "distributor" nodeid "got" v c)
        (cond
         (= c ctl)    (do 
                        (un-register nodeid poolid)
                        (close-all! reqs allreqs vols reqs+vols ctl))
         (= v :empty) (do (when poolid
                            (debug "distributor" nodeid "is bored and volunteering with" poolid)
                            (lpush-and-set @back-end
                                           poolid :volunteers nodeid
                                           nodeid :busy nil))
                          (trace "distributor" nodeid "recurring")
                          (recur true))
         :else        (let [ [reqid volid] v]
                        (debug "distributor" nodeid "pushing" reqid "to request queue for" volid)
                        (when poolid (set-val @back-end nodeid :busy true))
                        (lpush @back-end volid :requests reqid)
                        (clear-bak @back-end [nodeid :volunteers nodeid :requests])
                        (recur false)))))
    ctl))

We'll launch one of these with (launch-distributor "poolname"). Since distributors can distribute to other distributors, there can also be a second argument, for the pool that this distributor belongs to.

The main excitement here is that we're listening simultaneously on multiple channels in a complicated way. The request channel has already-claimed requests filtered out using async/filter<; stale volunteers volunteers are weeded out in a similar fashion. Finally, vetted volunteer and request pairs are async/map vectored together into a single channel that disgorges [request volunteer] tuples:

allreqs   (crpop @back-end nodeid :requests)   ;; channel to receive all requests
reqs      (async/filter< unclaimed allreqs)    ;; filter out the ones that nobody is working on yet
allvols   (crpop @back-end nodeid :volunteers) ;; channel to receive all volunteers
vols      (async/filter< not-busy allvols)     ;; filter out volunteers who became busy after volunteering
reqs+vols (async/map vector [reqs vols])]      ;; bundle together request and volunteer tuples

Other than the fact that it receives these tuples rather than plain requests, the async/alt! logic is about the same as it was in the worker. When we get a live tuple, we simply push the reqid onto the volid queue and move-along.

Re-entrant requests

As in the examples, we might encounter requests within a cdefn function. In these cases, the macro will have been called without a nodeid argument, but we have one from the thread-local binding, so, requests expands eventually into a call to

(enqueue-reentrant reqs *nodeid* *reqchan*)))

which was shown last week, but should make more sense now:

(defn enqueue-reentrant [reqs nodeid reqchan]
  (let [out (chan)]
    (go 
      (let [results   (async/map vector (map #(enqueue nodeid %) reqs))]
        (async/go-loop []
          (let [ [v c] (async/alts! [results reqchan])]
          (if (= c results)
            (do (>! out v) (close! c) (close! out)
            (do (<! (process-reqid nodeid reqchan v))
                  (recur))))))))
    out))

As before, it dumps the requests onto the local queue, and then begins pulling from that same queue. The crucial thing to note is that process-reqid is called both here and by launch-worker, the difference being that launch-worker will only get work that it's volunteered for, while, here, we already know that there's work on the queue, because we put it there.

Sharing the load

As illustrated so far, a worker might end up with a very long queue indeed due to nested re-entrant requests . This is supposed to be a distributed system, so we want some way to take share the load. The helper process

(defn launch-helper [nodeid cycle-msec]
  (let [ctl                (lchan (str "launch-helper nodeid"))]
    (debug "worker" nodeid "starting")
    (async/go-loop []
      (let [member-nodeids   (get-members @back-end  nodeid :volunteers)
            in-our-queue     (set (qall @back-end nodeid :requests))
            in-member-queues (apply clojure.set/union 
                               (map #(set (qall @back-end % :requests)) member-nodeids))
            additions        (clojure.set/difference in-member-queues in-our-queue)]
        (when (seq additions)
          (debug "Helper" nodeid "lifting requests" additions)
          (lpush-many @back-end nodeid :requests (vec additions))))
      (if (closed? ctl)
        (debug "Closing helper" nodeid)
        (do 
          (trace "helper" nodeid "waiting" cycle-msec)
          (<! (timeout cycle-msec))
          (recur))))
    ctl))

wakes up on a configurable cycle, looks at the queues of all nodes in its pool and simply copies anything it finds there onto the end of the distributor's queue, whence, as described just above, it will dole out requests to any workers volunteering. So, basically, we're setting up a race where the original worker has a bit of a head start. Again, the worst case is that the same calculation gets done twice, and the main extra cost is in checking to see if a request is in process before acting on it.

Should the distributor belong to another pool up the hierarchy, then its queue will similarly be copied up.

The main idea here is that we generally try to do calculations locally, because

  1. there's some latency involved in distributing them (and we have logic to inform local listeners on an express path, so results don't need to flow through Redis),
  2. the next step in this project will use caching strategies such that the values Redis holds are only unique ids.

The system should be tuned such that distribution occurs roughly at the point where things are taking about as much time as the latency that distribution will introduce.

Phew!

Next time, I'll talk a little bit about testing this on 100 AWS nodes. It should be a lot shorter than this.



Comments

comments powered by Disqus