yak herding

Back in August, I wrote two posts about an experimental framework for distributed functional programming called Girder. The idea, in summary, was to make distributed code look as much as possible like ordinary Clojure, as opposed to structuring it explicitly around message passing (as in the actor model) or data flows (as in map/reduce, storm, et al). As I say, it was an experiment, but it was also something a cri de coeur (a mini-manifesto, if you will) against extraneous impingements on my code. Anyway, it sounds interesting, go back and read the posts, but you don't need to understand them in depth to understand today's post.

While it was illuminating to test this supposedly distributed computation framework on my little MacBook, I knew that, at some point, I would have to run it in the large. One of the few advantages of corporate serfdom is that the right supplications will often win you frolicking privileges on the baronial server fields, but we itinerant friars are stuck with Amazon Web Services. I know it's not fair to complain, but AWS is not really optimized for my purposes, or, in general, for situations where the following apply:

  • Your distributed system can only be tested realistically on a large number of machines.
  • Turnkey solutions do not already exist for the system. I.e. not Hadoop or a web application.
  • Full on HA automation would be overkill, since you're still experimenting.
  • You want to pay as little as possible, which means bidding in the spot auction market and killing instances the second you're done with them.
  • You insist on working in Clojure to the greatest extent possible.

If none of these are true, my cockamamie scheme is certainly not the best approach, but, if all or most of them are, this is definitely superior to the more obvious mix of web console tools and ssh.

I previously described the necessary AWS setup. Other than a few short bash scripts, that phase didn't require any actual code, and much of the work was unintuitive clicking about on the web console. (Unavoidably. I tried boiling it down to invocations of the AWS command line utility, but, since Amazon documents everything in terms of their web interface, you would have been completely lost were anything to go wrong.)

Today is a Clojure day:

  1. We lever the previously discussed AWS setup with a Clojure framework to launch instances and spot auction requests asynchronously, run specific software on the machines and provide clean notification when they're up and ready for business.
  2. We use core.async to contain and tame the synchronous interface that Amazon gives us.
  3. We use lens-like constructs to simplify interaction with complex configuration data.
  4. We build a utility wrapper that turns most stand-alone Clojure functions into a CLI app that handles the most common arguments, logging (including an option to log to Redis) and exceptions, bundling the result into an EDN compliant structure.

Step 3 turned out to be a rather lengthy and obsessive digression for me, leading to the four posts previous to this one.

The code mentioned below can be found in one of several places on github:

Amazonica

penthesileia

The main route to AWS is via their Java SDK, around which amazonica, which provides a complete Clojure wrapper. In fact, it's better than a wrapper, because the plain SDK is mind-bogglingly tedious - model citizen in the kingdom of nouns.

For example, to bid on an instance in the spot auction market, you call a static method in the AmazonEC2 class:

RequestSpotInstancesResult
    requestSpotInstances(RequestSpotInstancesRequest requestSpotInstancesRequest)

The RequestSpotInstantRequest class has a do-nothing constructor and lots of .setXYZ methods, the most important of which is

void setLaunchSpecification(LaunchSpecification launchSpecification)

where LaunchSpecification has yet more .set methods, including

void setNetworkInterfaces(Collection<InstanceNetworkInterfaceSpecification> networkInterfaces)

and the InstanceNetworkInterfaceSpecification is where setSubnetId(String subnetId) lives, so you really do end up needing all of these classes.

Amazonica, by contrast, turns this logically nested data into explicitly nested hash-maps. So an entire request can be constructed like this:

(def my-req
    {:spot-price 0.01, 
      :instance-count 1, 
      :type "one-time", 
      :launch-specification
         {:image-id "ami-something",
          :instance-type "t1.micro",
          :placement  {:availability-zone "us-east-1a"},
          :key-name "your-key"
          :user-data "THE BASH COMMAND WE WANT TO RUN, IN BASE 64"
          :network-interfaces
            [{:device-index 0
              :subnet-id "subnet-yowsa"
              :groups ["sg-hubba"]}]
              :iam-instance-profile
                  {:arn "arn:aws:iam::123456789:instance-profile/name-you-chose"}}})
(apply request-spot-instances (apply concat (seq my-req)))

Note:

  • The :user-data field isn't always a bash command, but we set it up that way.
  • (The (apply (concat (seq ....))) business is necessary, because amazonica functions are declared to take an arbitrary number of arguments. I.e. it really wants (request-spot-instances :spot-price 0.01 :instance-count 1 ...).)

This is a great improvement over pages of Java code, but we're not entirely free from the yoke of Amazon's SDK:

  1. Complex nested data: While nested, persistent data structures are easier and safer to deal with than nested, mutable special-purpose container classes, they're still nested in a complicated way; we would like to encapsulate the complexity in such a way that we can set and access the data we need, without distributing knowledge of the entire structure throughout our code.
  2. Synchronous interface: The Amazon SDK is in fact doubly synchronous, (a) in that calls to its static methods are blocking, and (b) in that what these blocking methods do is initiate some action in an Amazon data center and return, with no simple way to track these actions except to repeatedly call other blocking methods that request status.1

Lenses lite, for complex nested data

It would be nice if, rather than sprinkling our code with long, hard-coded assoc-in paths, we could specify path aliases in one place, like

(def path-aliases
  {:zone    [:launch-specification :placement :availability-zone]
   :itype   [:launch-specification :instance-type]
   :subnet  [:launch-specification :network-interfaces 0 :subnet-id]
   :group   [:launch-specification :network-interfaces 0 :groups 0]
   :public? [:launch-specification :network-interfaces 0 :associate-public-ip-address]
   :price   [:spot-price]
   :n       [:instance-count]
   :key     [:launch-specification :key-name]
   ...

and even nicer if those paths could specify incoming and outgoing transformation functions when we're required to mess around with encoding:

   :udata   [:launch-specification :user-data [s->b64 b64->s]]})

The previously mentioned lens posts describe tools to do just this. To set, for example, the command we want to run on startup, we can write

  (ph-assoc path-aliases my-dict :udata "echo howdy")

instead of

  (assoc-in my-dict [:launch-specification :user-data] (s->b64 "echo howdy"))

Taming the synchronous interface

lion taming

Sending notifications

Remember that an authorized user on EC2 can publish a simple-notification-service message from the command-line

aws --region us-east-1 sns publish --topic-arn  arn:aws:sns:us-east-1:yourtopic \
    --message yowsa

and that we can configure such that these messages in turn get published to SQS, the Simple Query Service, which in turn we can tap from from inside and outside of EC2.

Remembering also that we've set up our instances so that, on boot, they'll run commands that have been stuck into user-data, you may guess the basic strategy. The command we'll really run uses the ec2-metadata utility that we set up earlier to extract information about the host

aws --region us-east-1 sns publish --topic-arn arn:aws:sns:us-east-1:12345678:instance-up \
  --message \
  `(echo "id: 549a49cd-a64c-44c9-b567-4901c373dc0b";bin/ec2-metadata -i -p -o) base64 -w 0`

resulting in key-value pairs like

    id: 549a49cd-a64c-44c9-b567-4901c373dc0b
    instance-id: i-123456
    public-hostname: ec2-12-345-67-890.compute-1.amazonaws.com
    local-ipv4: 10.0.0.23

crammed into a base 64 encoded string. We will later decode this message with the help of clojure.data.codec.base64 and some hideous regular expressions

(defn s->b64 [s] (String. (b64/encode (.getBytes s))))
(defn- matcher->pair [[_ k v]] [(keyword k) v])
(defn b64->ec2-data [s]
  (let [xs  (-> s b64->s clojure.string/split-lines)
        es  (map #(->> % 
                       (re-matches #"^([\w\-]+):\s*(.*)")
                       matcher->pair) xs)]
    (into {} es)))

into a map of keywords to strings:

{:id                "549a49cd-a64c-44c9-b567-4901c373dc0b"
 :instance-id       "i-123456"
 :public-hostname  "ec2-12-345-67-890.compute-1.amazonaws.com"
 :local-ipv4       "10.0.0.23"}

The :id is a UUID we'll create when we request the instance, allowing us to track the results of that request via notifications.

Receiving notifications

gotmail

Back at the home front (i.e. the trusty laptop), we set up a single listener that transfers all messages from an SQS endpoint (as we previously set up) onto core.async channel:

(defn sqs-listen [url]
  (let [c  (chan 100)]
    (async/go-loop []
      (if (pimpl/closed? c)
        (debug "Shutting down sqs-listen" url)
        (let [messages  (:messages (receive-message :queue-url url :wait-time-seconds 20))]
          (doseq [{r :receipt-handle b :body} messages]
            (try (>! c (get (json/read-str b) "Message"))
                 (catch Exception e (info "sqs-listen" (stack-trace e))))
            (try (delete-message :queue-url url :receipt-handle r)
                 (catch Exception e (info "sqs-listen" (stack-trace e)))))
          (recur))))
    c))

The heart of this is (receive-message :queue-url url :wait-time-seconds 20), which is a blocking call to AWS that eventually returns one or more messages, or times out (20 seconds being the maximum timeout you're allowed to specify). While we can't force Amazon to give us an asynchronous alternative, we can isolate the synchronicity in this one function and not have to worry about it elsewhere. This pattern seems to come up a lot when interfacing with asynchronous communication libraries.

We're handed messages in a rather funny form. The hash map over :receipt-handle and :body is pleasant enough, except that the :body value is a JSON-encoded string, containing the real message under the key, "Message", and of course that "Message" is itself the base-64 encoded list of name/value pairs we sent ourselves. Malformed messages occur frequently enough that it's worth doing the decoding under try/catch, for which the stack-trace function is not particularly deep, but still quite useful:

  (defn stack-trace [e] (map str (into [] (.getStackTrace e))))
Demultiplexing notifications

All notifications, from every host, come back over the sqs-listen channel in the form of base-64 encoded name/value pairs created by the remote aws sns command. One of those values is a unique :id code, which, as we'll see below, is created at the time of the original request, so we can distribute messages out to specifically interested listeners:

We maintain a map of ids to channels in an atom

(defonce ids->chs (atom {}))

and subscribe by associating a new channel to each new id:

(defn notify-chan
  [id]
  (let [c (chan)]
    (swap! ids->chs assoc id c)
    c))

Meanwhile, there's a thread listening on our SQS channel, decoding the messages and dispatching them onward:

(defn start-up-id-listener []
  (let [kill-switch  (chan)
        cl           (sqs-listen)]
    (async/go-loop []
      (let [[v c] (async/alts! [kill-switch cl])]
        (cond
         (= c kill-switch)
            (do (reset! ids->chs {}) (close! cl))
         (= c cl)
            (do (try (let [s   (b64->ec2-data v)
                           id  (:id s)
                           c   (get @ids->chs id)]
                        (when c (>! c s)))
                 (catch Exception e (info e {:stack-trace (stack-trace e)})))
                 (recur)))))
    kill-switch))

Typically, we would use this machinery like

(let [id (.toString (java.util.UUID/randomUUID))]
  (start-up-something-on-aws-using id)
  (notify-chan id))

Actually running something

Let's start by bringing up a single medium sized host and run the Redis server on it.

Here are the instructions that masochists use:

  1. Go to the list of images including the AMI we set up last time.
  2. Select "Spot Request" from the Action dropdown.
  3. Select the m3.medium instance type.
  4. Type in the maximum price we'll pay per hour.
  5. Override the default networking to the VPC we created.
  6. Override the default IAM user to the one we created.
  7. Override the default security group to the one we created.
  8. Click Launch.
  9. Repeatedly refresh the Spot Request monitor until it shows an instance id.
  10. When it does, after 1 to 10 minutes, examine the instance on the Instances Page
  11. Copy its public IP address and ssh to it from a terminal
  12. Run your program.
  13. Repeat as necessary.

What we would like instead is a simple function, to which we provide our request map (in which most of the manually entered values above are already present), a few extra stipulations about the machine, and the command we want to run

(bring-up-one-spot my-req ["bin/redis-server"] :subnet my-sub-public :itype "m3.medium" :price 0.05)

and get back a channel that will deliver information about the instance once it's up. If the instance doesn't come up properly, we want our function to eradicate any bits of it that might still be costing us money.

Let's build that function. For brevity I've stripped out configuration details and optional arguments present in the real code, so there will some hard-coding, as well as references throughout to global variables like my-sns-topic, which you'll have to take it on faith are defined somewhere.

The function will start by ph-assoc-ing any specified stipulations (such as the subnet, type and maximum hourly price, in this case) into the default request map:

(defn bring-up-one-spot
  [req cmds & opts]
  (let [req (apply ph-assoc req path-aliases opts)

Then it constructs the full command for the spot to run and uses ph-assoc to encode it as user data

        id  (.toString (UUID/randomUUID))
        cmd (str (send-up id) (clojure.string/join "\n" cmds) "\n")
        req (ph-assoc req paths :udata cmd)

where send-up constructs the previously described aws sns publish command embedding id.

Next, we register for notifications on that id, make the call to initiate the spot auction request, and extract the identifier that EC2 gives to the request (it will be "sir-" something or other, which consistently makes me giggle):

        cl (notify-chan id)
        rs (apply request-spot-instances (apply concat (seq my-req)))
        rs (map :spot-instance-request-id  (:spot-instance-requests rs))]

Now we wait for either notification or a timeout:

   (go
      let [to   (timeout (* 10 60 1000))
          [v c] (async/alts! [cl to])

Either way, we're going to want some information about the request, including the instance id if an instance was actually created:

          ds    (:spot-instance-requests
                     (describe-spot-instance-requests :spot-instance-request-ids rs))
          is    (filter (complement nil?) (map :instance-ids ds))]

In the event of a timeout, it's still possible that the request was fulfilled, but for some reason or another our notification didn't occur, so we should cancel the requests as well as terminate the instance, if it exists:

       (condp = c
         to (do
              (cancel-spot-instance-requests :spot-instance-request-ids rs)
              (when (seq is) (terminate-instances :instance-ids (vec is)))
               nil)

If we do receive notification, we can just return the request and instance ids

         cl [rs is]))))

so they'll be delivered on the channel returned by the go block.

Refactoring and generalization

The code above makes a few compromises for clarity. I won't go over all the details, but the real bring-up-spots function is a bit fancier:

  1. As per its plural name, you can request an arbitrary number of identically configured instances, along with a minimum that you require to have come up before the timeout. If fewer than the minimum come up, we cancel the whole lot of them.
  2. All live spot request and instance ids are stored in a global atom, so we can quickly tear down the entire session with a single call if we need to go home.
  3. The API boilerplate is pulled into functions that simplify the interface and clean up the output.
  4. The channel eventually returns not just the request and instance ids, but a map from request id to maps containing more information, e.g.:
   {"sir-12345" {:instance-id "abc123"
                 :ip          "10.0.7.100" ;; private IP on the VPC
                 :host        "ec2-54-101-165-9-98-compute1.amazonaws.com" ;; public
                 :state       "running"
                 :request-id  "sir-12345"} ... }

A generic CLI runnable for Clojure functions

eden-cranach

So far, the only thing we've run is the Redis server, whose executable we pre-loaded earlier onto the image. To run Clojure code, we need wrap it in a class with a -main, bundle it up with lein-uberjar, copy it to S3 from our external host, and copy it from S3 down to all the internal instances that need it.2

In the case of girder, the guts of the runnable will be the the launch-worker, launch-distributor, etc. surrounded by a tremendous amount of common functionality. It is useful to have a generic wrapper that is given a function to invoke and an option specification in clojure.tools.cli form, provides a several core niceties:

  1. Augments the option specification for the common functionality.
  2. Sets up timbre logging, including an option to log to Redis.
  3. Wraps the function call in a try/catch.
  4. Invoke the function, passing it a map of parsed options to values.
  5. Allow that map to be specified fully in a single EDN string (useful when constructing the command from within Clojure).
  6. Package the return value in a map containing either the :result or an :exception.
  7. Possibly include a vector of :log entries in the output.
  8. And timing information.
  9. Dump it all to stdout in machine-readable pr-str form.
  10. Optionally hang around for a while before exiting.

None of that is particularly difficult, but it is tedious to do over and over again, so I wrote a utility function, allowing one-line definition of -main:

(defn -main [& args] (cli/edn-app args cli-options doit))

For Girder, the doit starts by extracting a few options:

(defn doit [opts]
  (acyclic.girder.grid.redis/init! (:host opts) (:port opts))
  (let [{:keys [numrecjobs reclevel cmt help pool worker distributor host port
                jobmsecs jobs jobtimeout helper cleanup]} opts]

and returns a map, containing entries for one or more services we've chosen to launch in this instance. (The specific services won't make much sense if you didn't read the Girder posts, but you get the idea.)

We optionally launch one or more distributors:

    {:distributor     ;; --distributor POOLID1[,POOLID2,...]
     (when distributor
       (let [ds (clojure.string/split distributor #",")]
         (for [d ds]
           (grid/launch-distributor d pool))))

A helper thread stealing work every MSECS:

     :helper          ;; --helper MSECS
     (when helper
       (grid/launch-helper distributor helper))

One or more workers (in the same process), either identified explicitly or automatically named:

     :worker          ;; --worker (N-WORKERS | WID1[,WID2,...])
     (when worker
       (let [u  (java.util.UUID/randomUUID)
             n  (try (Integer/parseInt worker) (catch Exception e nil))
             ws (if n
                  (map #(str "w" % "-" u) (range n))
                  (clojure.string/split worker #","))]
         (for [w ws]
           (grid/launch-worker w pool))))

The results from N parallel job requests:

     :jobs           ;; --jobs N
     (when (and pool jobs (pos? jobs))
       (let [f (fn [i] (grid/enqueue pool [recbog jobmsecs i reclevel numrecjobs cmt] false "cli"))
             c (async/map vector (map f (range jobs)))
             t (async/timeout (* 1000 jobtimeout))]
         (let [[v ch] (async/alts!! [c t])]
           (or v "timeout"))))}
    ))

Putting it all together

We can run an experiment by running a "script" in the REPL.

Since all the Girder services work via Redis, we start by bringing up the Redis server and extracting some information about it. It will be handy to have the uberjar on this machine, so we download it first:

(defn cmd-getjar [] (str "aws s3 cp s3://dist-ec2/girder.jar ."))
(def c-redis (bring-up-spots my-req 1 [(cmds-getjar) "bin/redis"] :subnet my-sub-public :itype "m3.medium" :price 0.05 :log "debug"))

Since we need to extract some information about the instance before we can start other instances, it's necessary to wait:

(def r-redis (<!! c-redis)
(def redis (first (vals r-redis)))
(def redis-ip (:ip redis))
(def redis-host (:host redis))
(def redis-log (str "debug:" redis-ip ":6379"))

We're going to pass the private redis-ip address and the log specification in which its embedded into the CLI invocations for the various server processes. With the public redis-host, we can, if we want, use clj-ssh and carmine to interrogate the server:

(def s-redis (ssh/session agent redis-host))
(ssh/forward-local-port s-redis 8379 6379)
(def car-redis {:pool {} :spec {:host "localhost" :port 8379}})
(wcar car-redis (car/keys '*'))

Next, we bring up a distributor and a helper in the same process, starting by downloading the uberjar:

(defn cmds-dist [pool ip msec log]
    [(cmd-getjar)
     (str "java -cp girder.jar acyclic.girder.testutils.grid --distributor " pool
          " --helper " msec
          " --host " ip " --hang 1000 --log " log)])
(def c-dist
   (bring-up-spots my-req 1 (cmds-dist "pool" redis-ip 100 redis-log)
                  :subnet my-sub-private :itype "t1.micro" :price 0.01 :key "girder" :minutes 100))

Now, the fun part. Spin up 100 hosts, running a worker on each:

(defn cmds-workers [n pool ip log]
  [(cmd-getjar)
   (str "java -cp girder.jar acyclic.girder.testutils.grid --worker " n
        " --pool " pool " --host " ip " --hang 1000 --log " log)])
(def c-workers
  (bring-up-spots my-req 100 (cmds-workers 1 "pool" redis-ip redis-log)
                  :subnet my-sub-private :itype "t1.micro" :price 0.01 :key "girder" :minutes 100))     

Wait until they're all up:

(def r-dist (<!! c-dist))
(def r-workers (<!! c-workers))

Since the log argument is of the form (str "debug:" redis-ip ":6379"), messages at :debug level will be sent to Redis, where we may peruse them at leisure:

(car-appender/query-entries car-redis :debug)

We're about ready to launch jobs. We'll do so from the Redis server, since it has a public IP, to which we can ssh.

(defn cmd-job [pool ip jobs reclevel log]
  (str  "java -cp girder.jar acyclic.girder.testutils.grid --pool " pool 
        " --host " ip " --jobs " jobs " --reclevel " reclevel " --log " log)
(ssh/ssh-exec s-redis (cmd-job "pool" redis-ip 50 3 redis-log))

If all goes well (and it did), this returns the same sort of array of "Bogosity" strings that we got locally in the first Girder post, except a lot more of them. Since not much computation is going on, the job of the workers is really to harass the distributor and Redis server as relentlessly as possible, so we get an idea of what they can take. And that seems to be quite a lot. Without any time spent on network optimization (and using t1.micro instances advertised for their cruddy network performance) system latency seems to be about 4-5ms per job, which is about 50 times better than I've seen on commercial systems running on much better (and better controlled) hardware.

Teardown

Cashflow from Girder is strictly negative, in the form of payments to Amazon, and those are largely proportional to the number of machines and the time that they're up. As noted, we keep track of all-requests in an atom, so they can be canceled en masse in a hurry with a single function call:

(clean-up)

Clojure vs Bash

orcs

I know they're fairy tales, but still I am haunted by stories my grandmother use to tell me, about dirty, evil monsters who do not recognize the superiority of the Clojure REPL for nearly any purpose. If only they would listen, or read blogs, surely the surging armies of darkness would lay down their arms and turn on paredit mode.

For interactive control of many server instances, the Clojure REPL has distinct advantages over the Bourne Shell. In addition to a boundless library of tools to prepare and analyze data, we have vastly better concurrency control via core.async. Plus it's more fun.


  1. Note: AWS does make a stab at (a) with various classes ending in Async, whose methods return Futures. Unfortunately, these are the silly sort of futures, which allow you to poll and block on completion but don't support callbacks or any way of fmap-ing on followup behavior. Anyway, type (b) is much more important, as the timescales are much longer. 

  2. Technically, we could have ssh'd onto every instance separately, but that would have taken longer, cost more and required each instance to have a public IP address. 



Comments

comments powered by Disqus