Some time after my recent fiddles with IMDB, I read an interesting article about using a perceptron to classify words as parts of speech based on features that precede them in text. It's all done in python or some such sh*t, but whatever. Still very cool. Since I had all of this IMDB data accumulated in Mongo, I thought I would try to play with it, and the idea I had was to predict metacritic scores from the actors that appeared in each film. In retrospect, it's far from clear that such a prediction can be made and especialy that a perceptron is well suited to making it. I tried a few things, and everything sucked, so the only saving grace was that some bits were fun to implement.

The first task was to augment my database, which had been built mostly for exploring linkages, with data that one might want to either predict or use for predicting. Basically, I need to add a :metacritic-score field to each record, and while I'm doing it, I should grab anything else that's easy to find in approximately the same place. I didn't want to wait a very long time, and imdb.com's IP throttling seems to get confused by my VPN provider, so the parallel GETs are in order, but not an unlimited number of GETs. In a situation like this, I like to set up a worker pool using core.async. If you have no idea what that is, read this, this and this. Then look at this:

(let [wc (a/chan), n 5]
    (dotimes [i n]
        (a/go (loop []
            (when-let [rec (a/<! wc)]
                (println "Doing stuff with" rec "in thread" i)
                (recur))
           (println "Shutting down thread" i))))
    (a/<!! (a/onto-chan c (repeatedly 10 rand))))

which prints out something like:

Doing stuff with 0 in thread Doing stuff with Doing stuff withDoing stuff with 4
Doing stuff with 
4  1 3 in thread 3
Doing stuff with 2 Doing stuff with 5 in thread 4
in thread 2
in thread Doing stuff with0
6Doing stuff with in thread 3
in thread Shutting down thread 3
Doing stuff with 
nil
  8 9 in thread 4
1
in thread 7 0
Shutting down thread Shutting down thread in thread 0
Shutting down thread 4
1
2
Shutting down thread 2

Obviously stuff is happening in parallel. Let's walk through it.

First, maybe obviously, my name space has :required [core.async :as a]. The basic idea is that anything we want done will be lobbed onto the wc channel, where a configurable number of workers will compete to unload it. We spawn n of these workers as identical go forms, which return immediately while their bodies continue running in the background. You'll often see a worker created like

(a/go (while true (println (a/<! wc))))

but this is untidily immortal. It's better to capture the nil that indicates that the channel has been closed, so you can clean up properly:

(a/go (loop [] (when-let [l (a/<! wc)] (do-stuff a)(recur))
              (clean-up)))

If this is one too many pairs of parentheses, core.async has some sugar for you:

(a/go-loop [] (when-let [l (a/<! wc)] (do-stuff a)(recur))
              (clean-up))

In any case, we can now offer work by >!!ing it into wc. If all the workers are busy, the operation will block, which is most likely the behavior we want. If, in equilibrium, we're generating work faster than it can be consumed, something nondeterministic and therefore bad will happen: either we drop work on the floor, or we accumulate a boundless queue that will eventually serve us an OutOfMemoryError exception.

A straightforward way to feed in a preexisting collection is

(doseq [rec my-coll] (a/>!! wc rec))  (a/close! wc))

but the there's a handy function that does a lot of this for us

(a/<!! (a/onto-chan wc my-coll))

even (optionally) closing the channel for us afterwards. The only thing to watch out for is that onto-chan returns immediately with a channel that will receive notification when everything's been processed. If we want to block, we can explicitly wait on that channel with <!!.

Finally, since we're using async anyway, we might as well also use it to linearize our status messages by creating a worker for the purpose:

(let [wc (a/chan), lc (a/chan),  n 5]
    (a/go-loop [] (when-let [l (a/<! lc)] (apply println l) (recur)))
    (dotimes [i n]
        (a/go-loop []
            (when-let [rec (a/<! wc)]
                (a/>! lc ["Doing stuff with" rec "in thread" i])
                (recur))
           (a/>! lc ["Shutting down thread" i])))
    (a/<!! (a/onto-chan wc (range 10))))

giving us the rather more organized

Doing stuff with 2 in thread 2
Doing stuff with 1 in thread 4
Doing stuff with 0 in thread 1
Doing stuff with 3 in thread 0
Doing stuff with 5 in thread 2
Doing stuff with 4 in thread 3
Doing stuff with 6 in thread 4
Doing stuff with 7 in thread 1
Doing stuff with 8 in thread 0
Doing stuff with 9 in thread 2
Shutting down thread 3
Shutting down thread 4
Shutting down thread 1
Shutting down thread 0
Shutting down thread 2

In production, you'd likely be using a real logging framework, nearly all of which are nicely fronted by clojure.tools.logging, but this just goes to illustrate that core.async is now so intuitive and pleasant that it saves you time even in non-production work.

Now we have to add static typing to this motherf***r. As a vocal booster of core.typed, I feel comfortable admitting that the business is in this case a little unattractive. Much of the magic of core.typed is implemented in macros, and you can't add typing annotations to macros - you'd have to annotate the underlying functions they call, and many of those functions are, while not technically private, considered to be internal.

As it happens, core.typed ships with an annotated shell for core.async, providing alternate versions of key macros and functions. For example, (chan> Number) is a channel through which numbers pass. There's also a go> macro, which takes no special type arguments and substitutes exactly for go. You might wonder why, if we use it exactly the same way, do we need go> at all, and the answer is ugly. It mirrors the innards of go, except that it uses chan> instead of chan and wraps some typologically gruesome forms in tc-ignore. There's now an interdependence between core.typed and core.async, which is acceptable only because of defensive 0.x versioning of both libraries and viable only because the authors get along well.

This is not sustainable. Clearly, package annotations need to move out of core.typed and into their own distributions which means core.typed needs to be stable and widely accepted enough that it becomes de rigeur to depend on it. Essentially, it needs to get to the point where it's considered part of the language itself. With that in mind, go back on click on the "booster" link several paragraphs above (or equivalently here ) and support the crowdsourcing effort to put core.typed on a professional footing.

Clicked yet?

Until you do, we'll have file headers like this:

(ns whatever
  (:require ...
             [clojure.core.typed :as t]
             [clojure.core.async :as a]
             [clojure.core.typed.async :as ta])
  ...)
(clojure.core.typed/typed-deps clojure.core.typed.async)

Kvetching aside, it really isn't so hard to do things properly, even today. The code below uses monger and a bunch of functions defined in my increasingly cluttered IMDB project on github:

(t/ann ^:no-check monger.operators/$exists clojure.lang.Symbol)
(t/ann ^:no-check monger.operators/$regex clojure.lang.Symbol)
(t/ann pillage-imdb (Fn [t/AnyInteger -> nil]))
(t/ann ^:no-check clojure.core.async/onto-chan
       (All [a] (Fn [(clojure.core.typed.async/Chan a) (t/Seqable a) ->
                     (clojure.core.typed.async/ReadOnlyPort Any)]) ))
(defn pillage-imdb [n]
  (let [recs (mc/find-maps "nodes"
                           {:metacritic-score {mo/$exists false}
                            :id {mo/$regex "^\\/title"}})
        _     (println "Found" (count recs) "nodes to repair")
        c     (ta/chan> MongoRecord)]
    (t/dotimes> [i n] 
      (ta/go>
       (loop [] (when-let [rec (a/<! c)] 
                  (-> (dissoc rec :updated)
                      (assure-node)
                      (add-film-details)
                      (add-metacritic-score)
                      (as-> % (if (get % :updated)
                                (do (println "Updating in" i ":" %)
                                    (mc/update "nodes" {:id (:id %)} 
                                               (dissoc % :updated))))))
                  (recur)))
       (println "Leaving" i)))
    (a/<!! (a/onto-chan c recs))
    nil ; (because we promised to return nil in annotation)
    ))

You can certainly tell what this does. Note that onto.chan wasn't annotated, so I had to do it myself. This isn't a case of annotating for my special case; the polymorphic declaration is quite general. Also, core.typed doesn't ship with a go-loop> macro, so we have to type a few extra characters.

The main significance of this unremarkable code is personal. Having started writing it without typing, I ran into bugs that would normally have required a hunting expedition and liberal printlning. Instead, I took a step back, added the type annotations, and my problem showed up immediately in the output of check-ns. Static typing isn't only about virtue. Sometimes it's also the easy way out.



Comments

comments powered by Disqus