Some time after my recent fiddles with IMDB, I read an interesting article about using a perceptron to classify words as parts of speech based on features that precede them in text. It's all done in python or some such sh*t, but whatever. Still very cool. Since I had all of this IMDB data accumulated in Mongo, I thought I would try to play with it, and the idea I had was to predict metacritic scores from the actors that appeared in each film. In retrospect, it's far from clear that such a prediction can be made and especialy that a perceptron is well suited to making it. I tried a few things, and everything sucked, so the only saving grace was that some bits were fun to implement.
The first task was to augment my database, which had been built mostly for
exploring linkages, with data that one might want to either predict or use for
predicting. Basically, I need to add a :metacritic-score
field to each
record, and while I'm doing it, I should grab anything else that's easy to find
in approximately the same place. I didn't want to wait a very long time, and
imdb.com's IP throttling seems to get confused by my VPN provider, so the
parallel GETs are in order, but not an unlimited number of GETs.
In a situation like this, I like
to set up a worker pool using core.async
. If you have no idea what
that is, read
this,
this and
this.
Then look at this:
(let [wc (a/chan), n 5]
(dotimes [i n]
(a/go (loop []
(when-let [rec (a/<! wc)]
(println "Doing stuff with" rec "in thread" i)
(recur))
(println "Shutting down thread" i))))
(a/<!! (a/onto-chan c (repeatedly 10 rand))))
which prints out something like:
Doing stuff with 0 in thread Doing stuff with Doing stuff withDoing stuff with 4
Doing stuff with
4 1 3 in thread 3
Doing stuff with 2 Doing stuff with 5 in thread 4
in thread 2
in thread Doing stuff with0
6Doing stuff with in thread 3
in thread Shutting down thread 3
Doing stuff with
nil
8 9 in thread 4
1
in thread 7 0
Shutting down thread Shutting down thread in thread 0
Shutting down thread 4
1
2
Shutting down thread 2
Obviously stuff is happening in parallel. Let's walk through it.
First, maybe obviously, my name space
has :require
d [core.async :as a]
. The basic idea is that
anything we want done will be lobbed onto the wc
channel, where
a configurable number of workers will compete to unload it.
We spawn n
of these workers as identical go
forms,
which return immediately while
their bodies continue running in the background. You'll often see a
worker created like
(a/go (while true (println (a/<! wc))))
but this is untidily immortal.
It's better to capture the nil
that indicates
that the channel has been closed, so you can clean up properly:
(a/go (loop [] (when-let [l (a/<! wc)] (do-stuff a)(recur))
(clean-up)))
If this is one too many pairs of parentheses, core.async
has some
sugar for you:
(a/go-loop [] (when-let [l (a/<! wc)] (do-stuff a)(recur))
(clean-up))
In any case, we can now offer work by >!!
ing it into wc
. If
all the workers are busy, the operation will block, which is most likely
the behavior we want. If, in equilibrium, we're generating work faster than
it can be consumed, something nondeterministic and therefore bad
will happen: either we drop work on the
floor, or we accumulate a boundless queue that will eventually serve us an
OutOfMemoryError
exception.
A straightforward way to feed in a preexisting collection is
(doseq [rec my-coll] (a/>!! wc rec)) (a/close! wc))
but the there's a handy function that does a lot of this for us
(a/<!! (a/onto-chan wc my-coll))
even (optionally) closing the channel for us afterwards. The only
thing to watch out for is that onto-chan
returns immediately with a
channel that will receive notification when everything's been processed.
If we want to block, we can explicitly wait on that channel with <!!
.
Finally, since we're using async anyway, we might as well also use it to linearize our status messages by creating a worker for the purpose:
(let [wc (a/chan), lc (a/chan), n 5]
(a/go-loop [] (when-let [l (a/<! lc)] (apply println l) (recur)))
(dotimes [i n]
(a/go-loop []
(when-let [rec (a/<! wc)]
(a/>! lc ["Doing stuff with" rec "in thread" i])
(recur))
(a/>! lc ["Shutting down thread" i])))
(a/<!! (a/onto-chan wc (range 10))))
giving us the rather more organized
Doing stuff with 2 in thread 2
Doing stuff with 1 in thread 4
Doing stuff with 0 in thread 1
Doing stuff with 3 in thread 0
Doing stuff with 5 in thread 2
Doing stuff with 4 in thread 3
Doing stuff with 6 in thread 4
Doing stuff with 7 in thread 1
Doing stuff with 8 in thread 0
Doing stuff with 9 in thread 2
Shutting down thread 3
Shutting down thread 4
Shutting down thread 1
Shutting down thread 0
Shutting down thread 2
In production, you'd likely be using a real logging framework, nearly all
of which are nicely fronted by
clojure.tools.logging, but
this just goes to illustrate that core.async
is now so intuitive and
pleasant that it saves you time even in non-production work.
Now we have to add static typing to this motherf***r. As a vocal
booster
of core.typed, I feel comfortable
admitting that the business is in this case a little unattractive. Much
of the magic of core.typed
is implemented in macros, and you can't add
typing annotations to macros - you'd have to annotate the underlying functions
they call, and many of those functions are, while not technically private,
considered to be internal.
As it happens, core.typed
ships with an annotated shell for
core.async
, providing alternate versions of key macros and functions.
For example, (chan> Number)
is a channel through
which numbers pass. There's also a go>
macro, which takes no special
type
arguments and substitutes exactly for go
. You might wonder why, if we use
it exactly the same way, do we need go>
at all, and the answer is
ugly.
It mirrors the innards of go
, except that it uses
chan>
instead of chan
and wraps some typologically gruesome
forms in tc-ignore
. There's now an interdependence between
core.typed
and core.async
, which is acceptable only because of
defensive 0.x
versioning of both libraries and viable only because
the authors get along well.
This is not sustainable. Clearly, package annotations need to move out of
core.typed
and into their own distributions which means core.typed
needs to be stable and widely accepted enough that it becomes de rigeur to
depend on it. Essentially, it needs to get to the point where it's considered
part of the language itself. With that in mind, go back on click on the
"booster" link several paragraphs above (or equivalently
here ) and support the
crowdsourcing effort to put core.typed
on a professional footing.
Clicked yet?
Until you do, we'll have file headers like this:
(ns whatever
(:require ...
[clojure.core.typed :as t]
[clojure.core.async :as a]
[clojure.core.typed.async :as ta])
...)
(clojure.core.typed/typed-deps clojure.core.typed.async)
Kvetching aside, it really isn't so hard to do things properly, even today.
The code below uses monger
and a bunch of functions defined in my
increasingly cluttered IMDB project on github:
(t/ann ^:no-check monger.operators/$exists clojure.lang.Symbol)
(t/ann ^:no-check monger.operators/$regex clojure.lang.Symbol)
(t/ann pillage-imdb (Fn [t/AnyInteger -> nil]))
(t/ann ^:no-check clojure.core.async/onto-chan
(All [a] (Fn [(clojure.core.typed.async/Chan a) (t/Seqable a) ->
(clojure.core.typed.async/ReadOnlyPort Any)]) ))
(defn pillage-imdb [n]
(let [recs (mc/find-maps "nodes"
{:metacritic-score {mo/$exists false}
:id {mo/$regex "^\\/title"}})
_ (println "Found" (count recs) "nodes to repair")
c (ta/chan> MongoRecord)]
(t/dotimes> [i n]
(ta/go>
(loop [] (when-let [rec (a/<! c)]
(-> (dissoc rec :updated)
(assure-node)
(add-film-details)
(add-metacritic-score)
(as-> % (if (get % :updated)
(do (println "Updating in" i ":" %)
(mc/update "nodes" {:id (:id %)}
(dissoc % :updated))))))
(recur)))
(println "Leaving" i)))
(a/<!! (a/onto-chan c recs))
nil ; (because we promised to return nil in annotation)
))
You can certainly tell what this does.
Note that onto.chan
wasn't annotated, so I had to do it myself. This
isn't a case of annotating for my special case; the polymorphic declaration is
quite general. Also, core.typed
doesn't ship with a go-loop>
macro,
so we have to type a few extra characters.
The main significance of this unremarkable code is personal. Having started
writing it without typing, I ran into bugs that would normally have required a
hunting expedition and liberal println
ing. Instead, I took a step back,
added the type annotations, and my problem showed up immediately in the output
of check-ns
. Static typing isn't only about virtue. Sometimes it's also the
easy way out.
Comments
comments powered by Disqus