Back in August, I wrote two posts about an experimental framework for distributed functional programming called Girder. The idea, in summary, was to make distributed code look as much as possible like ordinary Clojure, as opposed to structuring it explicitly around message passing (as in the actor model) or data flows (as in map/reduce, storm, et al). As I say, it was an experiment, but it was also something a cri de coeur (a mini-manifesto, if you will) against extraneous impingements on my code. Anyway, it sounds interesting, go back and read the posts, but you don't need to understand them in depth to understand today's post.
While it was illuminating to test this supposedly distributed computation framework on my little MacBook, I knew that, at some point, I would have to run it in the large. One of the few advantages of corporate serfdom is that the right supplications will often win you frolicking privileges on the baronial server fields, but we itinerant friars are stuck with Amazon Web Services. I know it's not fair to complain, but AWS is not really optimized for my purposes, or, in general, for situations where the following apply:
- Your distributed system can only be tested realistically on a large number of machines.
- Turnkey solutions do not already exist for the system. I.e. not Hadoop or a web application.
- Full on HA automation would be overkill, since you're still experimenting.
- You want to pay as little as possible, which means bidding in the spot auction market and killing instances the second you're done with them.
- You insist on working in Clojure to the greatest extent possible.
If none of these are true, my cockamamie scheme is certainly not the best approach, but, if all or most of them are, this is definitely superior to the more obvious mix of web console tools and ssh.
I previously described the necessary AWS setup. Other than a few short bash scripts, that phase didn't require any actual code, and much of the work was unintuitive clicking about on the web console. (Unavoidably. I tried boiling it down to invocations of the AWS command line utility, but, since Amazon documents everything in terms of their web interface, you would have been completely lost were anything to go wrong.)
Today is a Clojure day:
- We lever the previously discussed AWS setup with a Clojure framework to launch instances and spot auction requests asynchronously, run specific software on the machines and provide clean notification when they're up and ready for business.
- We use
core.async
to contain and tame the synchronous interface that Amazon gives us. - We use lens-like constructs to simplify interaction with complex configuration data.
- We build a utility wrapper that turns most stand-alone Clojure functions into a CLI app that handles the most common arguments, logging (including an option to log to Redis) and exceptions, bundling the result into an EDN compliant structure.
Step 3 turned out to be a rather lengthy and obsessive digression for me, leading to the four posts previous to this one.
The code mentioned below can be found in one of several places on github:
Amazonica
The main route to AWS is via their Java SDK, around which amazonica, which provides a complete Clojure wrapper. In fact, it's better than a wrapper, because the plain SDK is mind-bogglingly tedious - model citizen in the kingdom of nouns.
For example, to bid on an instance in the spot auction market, you call a static method in the
AmazonEC2
class:
RequestSpotInstancesResult
requestSpotInstances(RequestSpotInstancesRequest requestSpotInstancesRequest)
The RequestSpotInstantRequest
class has a do-nothing constructor and lots of .setXYZ
methods,
the most important of which is
void setLaunchSpecification(LaunchSpecification launchSpecification)
where LaunchSpecification
has yet more .set
methods, including
void setNetworkInterfaces(Collection<InstanceNetworkInterfaceSpecification> networkInterfaces)
and the InstanceNetworkInterfaceSpecification
is where setSubnetId(String subnetId)
lives, so you really
do end up needing all of these classes.
Amazonica, by contrast, turns this logically nested data into explicitly nested hash-maps. So an entire request can be constructed like this:
(def my-req
{:spot-price 0.01,
:instance-count 1,
:type "one-time",
:launch-specification
{:image-id "ami-something",
:instance-type "t1.micro",
:placement {:availability-zone "us-east-1a"},
:key-name "your-key"
:user-data "THE BASH COMMAND WE WANT TO RUN, IN BASE 64"
:network-interfaces
[{:device-index 0
:subnet-id "subnet-yowsa"
:groups ["sg-hubba"]}]
:iam-instance-profile
{:arn "arn:aws:iam::123456789:instance-profile/name-you-chose"}}})
(apply request-spot-instances (apply concat (seq my-req)))
Note:
- The
:user-data
field isn't always a bash command, but we set it up that way. - (The
(apply (concat (seq ....)))
business is necessary, becauseamazonica
functions are declared to take an arbitrary number of arguments. I.e. it really wants(request-spot-instances :spot-price 0.01 :instance-count 1 ...)
.)
This is a great improvement over pages of Java code, but we're not entirely free from the yoke of Amazon's SDK:
- Complex nested data: While nested, persistent data structures are easier and safer to deal with than nested, mutable special-purpose container classes, they're still nested in a complicated way; we would like to encapsulate the complexity in such a way that we can set and access the data we need, without distributing knowledge of the entire structure throughout our code.
- Synchronous interface: The Amazon SDK is in fact doubly synchronous, (a) in that calls to its static methods are blocking, and (b) in that what these blocking methods do is initiate some action in an Amazon data center and return, with no simple way to track these actions except to repeatedly call other blocking methods that request status.1
Lenses lite, for complex nested data
It would be nice if, rather than sprinkling our code with long, hard-coded assoc-in
paths,
we could specify path aliases in one place, like
(def path-aliases
{:zone [:launch-specification :placement :availability-zone]
:itype [:launch-specification :instance-type]
:subnet [:launch-specification :network-interfaces 0 :subnet-id]
:group [:launch-specification :network-interfaces 0 :groups 0]
:public? [:launch-specification :network-interfaces 0 :associate-public-ip-address]
:price [:spot-price]
:n [:instance-count]
:key [:launch-specification :key-name]
...
and even nicer if those paths could specify incoming and outgoing transformation functions when we're required to mess around with encoding:
:udata [:launch-specification :user-data [s->b64 b64->s]]})
The previously mentioned lens posts describe tools to do just this. To set, for example, the command we want to run on startup, we can write
(ph-assoc path-aliases my-dict :udata "echo howdy")
instead of
(assoc-in my-dict [:launch-specification :user-data] (s->b64 "echo howdy"))
Taming the synchronous interface
Sending notifications
Remember that an authorized user on EC2 can publish a simple-notification-service message from the command-line
aws --region us-east-1 sns publish --topic-arn arn:aws:sns:us-east-1:yourtopic \
--message yowsa
and that we can configure such that these messages in turn get published to SQS, the Simple Query Service, which in turn we can tap from from inside and outside of EC2.
Remembering also that we've set up our instances so that, on boot, they'll run commands that have been
stuck into user-data, you may guess the basic strategy. The command we'll really run
uses the ec2-metadata
utility that we set up earlier to extract information about the
host
aws --region us-east-1 sns publish --topic-arn arn:aws:sns:us-east-1:12345678:instance-up \
--message \
`(echo "id: 549a49cd-a64c-44c9-b567-4901c373dc0b";bin/ec2-metadata -i -p -o) base64 -w 0`
resulting in key-value pairs like
id: 549a49cd-a64c-44c9-b567-4901c373dc0b
instance-id: i-123456
public-hostname: ec2-12-345-67-890.compute-1.amazonaws.com
local-ipv4: 10.0.0.23
crammed into a base 64 encoded string. We will later
decode this message with the help of clojure.data.codec.base64
and some
hideous regular expressions
(defn s->b64 [s] (String. (b64/encode (.getBytes s))))
(defn- matcher->pair [[_ k v]] [(keyword k) v])
(defn b64->ec2-data [s]
(let [xs (-> s b64->s clojure.string/split-lines)
es (map #(->> %
(re-matches #"^([\w\-]+):\s*(.*)")
matcher->pair) xs)]
(into {} es)))
into a map of keywords to strings:
{:id "549a49cd-a64c-44c9-b567-4901c373dc0b"
:instance-id "i-123456"
:public-hostname "ec2-12-345-67-890.compute-1.amazonaws.com"
:local-ipv4 "10.0.0.23"}
The :id
is a UUID we'll create when we request the instance, allowing us to
track the results of that request via notifications.
Receiving notifications
Back at the home front (i.e. the trusty laptop), we set up a single listener that transfers
all messages from an SQS endpoint (as we previously set up) onto core.async
channel:
(defn sqs-listen [url]
(let [c (chan 100)]
(async/go-loop []
(if (pimpl/closed? c)
(debug "Shutting down sqs-listen" url)
(let [messages (:messages (receive-message :queue-url url :wait-time-seconds 20))]
(doseq [{r :receipt-handle b :body} messages]
(try (>! c (get (json/read-str b) "Message"))
(catch Exception e (info "sqs-listen" (stack-trace e))))
(try (delete-message :queue-url url :receipt-handle r)
(catch Exception e (info "sqs-listen" (stack-trace e)))))
(recur))))
c))
The heart of this is (receive-message :queue-url url :wait-time-seconds 20)
, which is a blocking
call to AWS that eventually returns one or more messages, or times out (20 seconds being the maximum
timeout you're allowed to specify).
While we can't force Amazon to give us an asynchronous alternative, we can isolate the synchronicity
in this one function and not have to worry about it elsewhere. This pattern seems to come up a lot when
interfacing with asynchronous communication libraries.
We're handed messages in a rather funny form. The hash map over :receipt-handle
and
:body
is pleasant enough, except that the :body
value is a JSON-encoded string,
containing the real message under the key, "Message"
, and of course that
"Message" is itself the base-64 encoded list of name/value pairs we sent ourselves.
Malformed messages occur frequently enough that it's worth doing the decoding under try/catch
,
for which the stack-trace
function is not particularly deep, but still quite useful:
(defn stack-trace [e] (map str (into [] (.getStackTrace e))))
Demultiplexing notifications
All notifications, from every host, come back over the sqs-listen
channel
in the form of base-64 encoded name/value pairs created by the remote
aws sns
command. One of those values is a unique :id
code, which,
as we'll see below, is created at the time of the original request, so
we can distribute messages out to specifically interested listeners:
We maintain a map of ids to channels in an atom
(defonce ids->chs (atom {}))
and subscribe by associating a new channel to each new id:
(defn notify-chan
[id]
(let [c (chan)]
(swap! ids->chs assoc id c)
c))
Meanwhile, there's a thread listening on our SQS channel, decoding the messages and dispatching them onward:
(defn start-up-id-listener []
(let [kill-switch (chan)
cl (sqs-listen)]
(async/go-loop []
(let [[v c] (async/alts! [kill-switch cl])]
(cond
(= c kill-switch)
(do (reset! ids->chs {}) (close! cl))
(= c cl)
(do (try (let [s (b64->ec2-data v)
id (:id s)
c (get @ids->chs id)]
(when c (>! c s)))
(catch Exception e (info e {:stack-trace (stack-trace e)})))
(recur)))))
kill-switch))
Typically, we would use this machinery like
(let [id (.toString (java.util.UUID/randomUUID))]
(start-up-something-on-aws-using id)
(notify-chan id))
Actually running something
Let's start by bringing up a single medium sized host and run the Redis server on it.
Here are the instructions that masochists use:
- Go to the list of images including the AMI we set up last time.
- Select "Spot Request" from the Action dropdown.
- Select the
m3.medium
instance type. - Type in the maximum price we'll pay per hour.
- Override the default networking to the VPC we created.
- Override the default IAM user to the one we created.
- Override the default security group to the one we created.
- Click Launch.
- Repeatedly refresh the Spot Request monitor until it shows an instance id.
- When it does, after 1 to 10 minutes, examine the instance on the Instances Page
- Copy its public IP address and ssh to it from a terminal
- Run your program.
- Repeat as necessary.
What we would like instead is a simple function, to which we provide our request map (in which most of the manually entered values above are already present), a few extra stipulations about the machine, and the command we want to run
(bring-up-one-spot my-req ["bin/redis-server"] :subnet my-sub-public :itype "m3.medium" :price 0.05)
and get back a channel that will deliver information about the instance once it's up. If the instance doesn't come up properly, we want our function to eradicate any bits of it that might still be costing us money.
Let's build that function. For brevity I've stripped out
configuration details and optional arguments present in the real code,
so there will some hard-coding, as well as references throughout to
global variables like my-sns-topic
, which you'll have to take it
on faith are defined somewhere.
The function will start by ph-assoc
-ing any specified stipulations (such as the subnet,
type and maximum hourly price, in this case) into the default request map:
(defn bring-up-one-spot
[req cmds & opts]
(let [req (apply ph-assoc req path-aliases opts)
Then it constructs the full command for the spot to run and uses ph-assoc
to encode it
as user data
id (.toString (UUID/randomUUID))
cmd (str (send-up id) (clojure.string/join "\n" cmds) "\n")
req (ph-assoc req paths :udata cmd)
where send-up
constructs the previously described aws sns publish
command embedding id
.
Next, we register for notifications on that id
,
make the call to initiate the spot auction request, and extract the identifier
that EC2 gives to the request (it will be "sir-" something or other, which consistently
makes me giggle):
cl (notify-chan id)
rs (apply request-spot-instances (apply concat (seq my-req)))
rs (map :spot-instance-request-id (:spot-instance-requests rs))]
Now we wait for either notification or a timeout:
(go
let [to (timeout (* 10 60 1000))
[v c] (async/alts! [cl to])
Either way, we're going to want some information about the request, including the instance id if an instance was actually created:
ds (:spot-instance-requests
(describe-spot-instance-requests :spot-instance-request-ids rs))
is (filter (complement nil?) (map :instance-ids ds))]
In the event of a timeout, it's still possible that the request was fulfilled, but for some reason or another our notification didn't occur, so we should cancel the requests as well as terminate the instance, if it exists:
(condp = c
to (do
(cancel-spot-instance-requests :spot-instance-request-ids rs)
(when (seq is) (terminate-instances :instance-ids (vec is)))
nil)
If we do receive notification, we can just return the request and instance ids
cl [rs is]))))
so they'll be delivered on the channel returned by the go
block.
Refactoring and generalization
The code above makes a few compromises for clarity. I won't go over all the details, but
the real bring-up-spots
function is a bit fancier:
- As per its plural name, you can request an arbitrary number of identically configured instances, along with a minimum that you require to have come up before the timeout. If fewer than the minimum come up, we cancel the whole lot of them.
- All live spot request and instance ids are stored in a global atom, so we can quickly tear down the entire session with a single call if we need to go home.
- The API boilerplate is pulled into functions that simplify the interface and clean up the output.
- The channel eventually returns not just the request and instance ids, but a map from request id to maps containing more information, e.g.:
{"sir-12345" {:instance-id "abc123"
:ip "10.0.7.100" ;; private IP on the VPC
:host "ec2-54-101-165-9-98-compute1.amazonaws.com" ;; public
:state "running"
:request-id "sir-12345"} ... }
A generic CLI runnable for Clojure functions
So far, the only thing we've run is the Redis server, whose executable we pre-loaded earlier onto
the image. To run Clojure code, we need wrap it in a class with a -main
, bundle it up
with lein-uberjar
, copy it to S3 from our external host, and copy it from S3 down to all the
internal instances that need it.2
In the case of girder, the guts of the runnable will be the the launch-worker
, launch-distributor
,
etc. surrounded by a tremendous amount of common functionality. It is useful to have a generic
wrapper that is given a function to invoke and an option specification in clojure.tools.cli
form,
provides a several core niceties:
- Augments the option specification for the common functionality.
- Sets up
timbre
logging, including an option to log to Redis. - Wraps the function call in a try/catch.
- Invoke the function, passing it a map of parsed options to values.
- Allow that map to be specified fully in a single EDN string (useful when constructing the command from within Clojure).
- Package the return value in a map containing either the
:result
or an:exception
. - Possibly include a vector of
:log
entries in the output. - And timing information.
- Dump it all to stdout in machine-readable
pr-str
form. - Optionally hang around for a while before exiting.
None of that is particularly difficult, but it is tedious to do over and over again, so I wrote
a utility
function, allowing one-line definition of -main
:
(defn -main [& args] (cli/edn-app args cli-options doit))
For Girder, the doit
starts by extracting a few options:
(defn doit [opts]
(acyclic.girder.grid.redis/init! (:host opts) (:port opts))
(let [{:keys [numrecjobs reclevel cmt help pool worker distributor host port
jobmsecs jobs jobtimeout helper cleanup]} opts]
and returns a map, containing entries for one or more services we've chosen to launch in this instance. (The specific services won't make much sense if you didn't read the Girder posts, but you get the idea.)
We optionally launch one or more distributors:
{:distributor ;; --distributor POOLID1[,POOLID2,...]
(when distributor
(let [ds (clojure.string/split distributor #",")]
(for [d ds]
(grid/launch-distributor d pool))))
A helper thread stealing work every MSECS
:
:helper ;; --helper MSECS
(when helper
(grid/launch-helper distributor helper))
One or more workers (in the same process), either identified explicitly or automatically named:
:worker ;; --worker (N-WORKERS | WID1[,WID2,...])
(when worker
(let [u (java.util.UUID/randomUUID)
n (try (Integer/parseInt worker) (catch Exception e nil))
ws (if n
(map #(str "w" % "-" u) (range n))
(clojure.string/split worker #","))]
(for [w ws]
(grid/launch-worker w pool))))
The results from N
parallel job requests:
:jobs ;; --jobs N
(when (and pool jobs (pos? jobs))
(let [f (fn [i] (grid/enqueue pool [recbog jobmsecs i reclevel numrecjobs cmt] false "cli"))
c (async/map vector (map f (range jobs)))
t (async/timeout (* 1000 jobtimeout))]
(let [[v ch] (async/alts!! [c t])]
(or v "timeout"))))}
))
Putting it all together
We can run an experiment by running a "script" in the REPL.
Since all the Girder services work via Redis, we start by bringing up the Redis server and extracting some information about it. It will be handy to have the uberjar on this machine, so we download it first:
(defn cmd-getjar [] (str "aws s3 cp s3://dist-ec2/girder.jar ."))
(def c-redis (bring-up-spots my-req 1 [(cmds-getjar) "bin/redis"] :subnet my-sub-public :itype "m3.medium" :price 0.05 :log "debug"))
Since we need to extract some information about the instance before we can start other instances, it's necessary to wait:
(def r-redis (<!! c-redis)
(def redis (first (vals r-redis)))
(def redis-ip (:ip redis))
(def redis-host (:host redis))
(def redis-log (str "debug:" redis-ip ":6379"))
We're going to pass the private redis-ip
address
and the log specification in which its embedded
into the CLI invocations for the various server processes.
With the public redis-host
, we can, if we want, use clj-ssh
and
carmine
to interrogate the server:
(def s-redis (ssh/session agent redis-host))
(ssh/forward-local-port s-redis 8379 6379)
(def car-redis {:pool {} :spec {:host "localhost" :port 8379}})
(wcar car-redis (car/keys '*'))
Next, we bring up a distributor and a helper in the same process, starting by downloading the uberjar:
(defn cmds-dist [pool ip msec log]
[(cmd-getjar)
(str "java -cp girder.jar acyclic.girder.testutils.grid --distributor " pool
" --helper " msec
" --host " ip " --hang 1000 --log " log)])
(def c-dist
(bring-up-spots my-req 1 (cmds-dist "pool" redis-ip 100 redis-log)
:subnet my-sub-private :itype "t1.micro" :price 0.01 :key "girder" :minutes 100))
Now, the fun part. Spin up 100 hosts, running a worker on each:
(defn cmds-workers [n pool ip log]
[(cmd-getjar)
(str "java -cp girder.jar acyclic.girder.testutils.grid --worker " n
" --pool " pool " --host " ip " --hang 1000 --log " log)])
(def c-workers
(bring-up-spots my-req 100 (cmds-workers 1 "pool" redis-ip redis-log)
:subnet my-sub-private :itype "t1.micro" :price 0.01 :key "girder" :minutes 100))
Wait until they're all up:
(def r-dist (<!! c-dist))
(def r-workers (<!! c-workers))
Since the log
argument is of the form (str "debug:" redis-ip ":6379")
, messages
at :debug
level will be sent to Redis, where we may peruse them at leisure:
(car-appender/query-entries car-redis :debug)
We're about ready to launch jobs. We'll do so from the Redis server, since it has a public IP, to which we can ssh.
(defn cmd-job [pool ip jobs reclevel log]
(str "java -cp girder.jar acyclic.girder.testutils.grid --pool " pool
" --host " ip " --jobs " jobs " --reclevel " reclevel " --log " log)
(ssh/ssh-exec s-redis (cmd-job "pool" redis-ip 50 3 redis-log))
If all goes well (and it did), this returns the same sort of array of "Bogosity" strings that
we got locally in the first Girder post, except a lot more of them. Since not much computation is
going on, the job of the workers is really to harass the distributor and Redis server as relentlessly
as possible, so we get an idea of what they can take. And that seems to be quite a lot. Without
any time spent on network optimization (and using t1.micro
instances advertised for their
cruddy network performance) system latency seems to be about 4-5ms per job, which is about 50 times
better than I've seen on commercial systems running on much better (and better controlled) hardware.
Teardown
Cashflow from Girder is strictly negative, in the form of payments to Amazon, and those are
largely proportional to the number of machines and the time that they're up. As noted,
we keep track of all-requests
in an atom, so they can be canceled en masse in a hurry
with a single function call:
(clean-up)
Clojure vs Bash
I know they're fairy tales, but still I am haunted by stories my
grandmother use to tell me, about dirty, evil monsters who do not
recognize the superiority of the Clojure REPL for nearly any purpose.
If only they would listen, or read blogs, surely the surging armies
of darkness would lay down their arms and turn on paredit
mode.
For interactive control of many server instances, the Clojure REPL has
distinct advantages over the Bourne Shell. In addition to a boundless
library of tools to prepare and analyze data, we have vastly better
concurrency control via core.async
. Plus it's more fun.
-
Note: AWS does make a stab at (a) with various classes ending in
Async
, whose methods returnFuture
s. Unfortunately, these are the silly sort of futures, which allow you to poll and block on completion but don't support callbacks or any way of fmap-ing on followup behavior. Anyway, type (b) is much more important, as the timescales are much longer. ↩ -
Technically, we could have ssh'd onto every instance separately, but that would have taken longer, cost more and required each instance to have a public IP address. ↩
Comments
comments powered by Disqus