Synecducers
Great computer languages, like monarchs and planets, become emblems of what surrounds them.
The greatest computer languages are barely there, as nearly everything we
file under their names could be described as a library or other customization.
It's not unusual and not even absurd to find a question about socket()
on a C
language
forum: Linux is arguably a morphogenic implication of C
. Clojure, too, is formally
minimal, but it induces composition.
Much of what we specifically admire about it isn't the language itself so much
as an expression of it.
Clojure provides Communicating Sequential Processes (CSP) via the core.async
library. The
go
construct is here implemented as a macro, yet the code that uses it is at least as elegant
and natural as that written in the go
language, whose inventors obviously found the
concept rather central. The internet is stuffed to the gills with core.async
tutorials, so I
won't go into it much today except to inform choices made in a Scala version.
Goal
The goal is to come as close as possible, in Scala, to the following Clojure code,
(defn foo [n]
(let [c1 (chan)
c2 (chan)]
(async/go-loop [n (dec n)]
(>! c1 "Fizz")
(<! (timeout (rand 500)))
(when (pos? n) (recur (dec n))))
(async/go-loop [n (dec n)]
(>! c2 41)
(<! (timeout (rand 500)))
(when (pos? n) (recur (dec n))))
(async/go-loop [n (dec (* 2 n))]
(let [[v c] (async/alts! [c1 c2])]
(condp = c
c1 (println (str v "Buzz"))
c2 (println (inc v))))
(when (pos? n) (recur (dec n))))))
but with type safety1 of course. What's most important is that the channel read and write operations should not block a thread, but I would also like to avoid significant addition of boilerplate.
Philosophical Rants
Development Driven Development
If you have no idea what you're doing, start typing and see what happens. If it doesn't work, fix it. If it's unfixable, rip it apart and start over. If the emerging structure suggests interesting generalizations, consider relocating the goal post. If the requirements unavoidably imply ugliness, think about changing the requirements. It's rare that anybody knows exactly what they need.
I could have called this Refactoring Driven Development, or Prototype Driven Development, but that wouldn't have been sarcastic enough to to evoke and implicitly deride Test Driven Development, whose synergies of witless sloganeering and robotic compliance promise not just general dystopia, but the very specific dystopia of the former Soviet Union.
Curtain Driven Development
When writing a library or framework that you expect to be used in diverse ways, it's easy to get too hung up on the dire prohibitions of our age:
- Mutability
- Locks
- Explicit type checks and casts
The evil of these constructs lies in their potential to create code that is difficult to understand ("reason about") and for that reason may hide tricky bugs. In fact, you should probably assume that there will be bugs, and bugs are bad in inverse proportion to the amount of time you're willing to spend on eradication and the proving of eradication. That amount of time should, in turn, be proportional to the aggregate time that the code will eventually run. In an application written for a relatively narrow purpose, you should be willing to sacrifice flexibility and even performance to avoid spending time fixing things.
On the other hand, the benefit of these constructs lies in their proximity to the hardware and the freedom to tell that hardware what to do more more or less directly. The real world is procedural, mutable and unityped, and, when coding in the real world, the only decision you can make is at where in the abstraction stack you choose to begin hiding that fact. Beyond that point, code should pay no attention to the infernal machine behind the curtain, but that doesn't diminish our need for infernal machines.
Backpedaling
To be honest, I was hoping that there would be a lot less curtain-worthy action
than ultimately appeared necessary. It had seemed at least plausible to
address the various
asynchronous dependencies and contingencies monadically, with vanilla promises
and futures, but this proved to be beyond me, especially in the implementation of
alts
. For the moment, I believe that one needs extra machinery when there
are potentially both many writers and many readers on the same channel - hence,
perhaps, the rather old-school concurrency techniques uses in the core.async
ManyToManyChannel
. Or I'm missing something.
I should also emphasize that this code is not nearly well tested enough to
be offered for production use. There's a difference between eschewing test-driven
development and embracing inadequate testing, but I don't have a lot of time on
my hands at the moment... Appropriately, this code lives for the moment
in my scala-playground
repository.
Asynchronicity in Scala
There isn't a standard implementation of CSP for Scala, but there are quite a few related tools and abstractions for concurrent programming. This is a non-exhaustive list of concepts in the space.
Futures and Promises
The simplest way of making something happen in the future in Scala is with a Future
,
val f = Future {Thread.sleep(100);2}
where Thread.sleep(100)
is a stand-in for some more reasonable time-consuming
activity.
So far, that's not much different from creating a Java Runnable
, but the most
stupendous thing
about Scala's futures is that they're functors,
val f2 = f.map {_+1}
so you can map arbitrary chains of what are essentially callbacks,
specifying them sequentially, rather than undergoing the
eponymous hell. Actually, they're monads too, with flatMap
, so you
can use the full for
sugar to coordinate multiple events:
val f3 = Future {Thread.sleep(1000); 3.14}
val f4 = for {i <- f2
d <- f3}
yield (d.round==i)
Generally one wants to avoid blocking on futures, but you can:
assert( Await.result(f2, 1 second)==4) // blocking
There are other interesting features, but that's the gist of it.
We also get Promise
s, which are really souped up
CountdownLatch
es that you use to trigger activity somewhere
else, generally via a future:
val p = Promise[Int]
p.future.onSuccess {case i => println(s"The answer is $i")}
p.success(42)
A single promise can produce an arbitrary number of futures, all of which will fire when the promise is fulfilled.
Actors
The standard suggestion for event driven programming in Scala these days seems to be Akka's Actor library. Akka has a lot to offer, but, for present purposes, we should note that the actor model is not CSP; in some ways, it's the opposite of CSP. Programming with CSP style channels is akin to using higher order functions with collections; it's a variation of a familiar FP paradigm, where the central entity is a conduit of data, and behavior is encapsulated in pure, transport-oblivious functions. Programming with actors is, essentially, writing miniature servers; it's a variation of a familiar OO paradigm, where the central entity is an object with customizable behaviors:
class MyActor extends Actor {
override def receive = {
case x:Int => doSomething(x)
case _ => doSomethingElse()
}
}
Async
As we've seen, the monadic future allows us to compose asynchronous events in
a manner that is much more intuitive than straight callbacks.
The Async Project takes this further, with
macros that enable an even more intuitive and efficient organization of
futures. In the example above, we can replace the for
comprehension
with:
async {
val i = await(f2)
val d = await(f3)
d.round==i
}
or even
async {await(f2) == await(f3).round}
Notwithstanding its misleading name, await
doesn't actually wait. Rather,
the state of execution is "parked" as it is for various !
operations
inside Clojure's go
blocks.
With async
we're well on our way to core.async
, but we still have to
bridge the divide between futures and channels.
scala-gopher
Gopher is the most fully fledged CSP framework for Scala that I've been able to find. It may be the wave of the future, but (again) for the present (and contrived) purposes, I will find things to object to. First, the author writes
Note, which this is not an emulation of go language constructions in scala, but rather reimplementation of key ideas in 'scala-like' manner.
but what I'm looking for is in fact an emulation of Clojure language
constructions, and, with the use of async
, I think I can achieve that
in a manner that is sufficiently Scala-like as at least to be legible. Second,
while Gopher is built with Async components, it introduces its own go
macro, which I'm hoping isn't necessary, since it would be nice to
mix futures and channel programming.
Akka Channels
The Akka Channel
class has been deprecated in favor of the
akka.persistence.AtLeastOnceDelivery
trait, which makes it somewhat
more obvious that its purpose is reliable delivery rather than CSP.
In retrospect, it seems that one shouldn't ever use the word "channel"
except to provide opportunities for going on about what we mean by the word.
Channels for Scala
Like Clojure's channel, ours is essentially a buffer with asynchronous access methods. Unlike Clojure's, ours will handle those asynchronous operations via futures and promises, so we can re-use the machinery and idioms for for dealing with them. We should be able to do this:
val c = Chan[Int]
async {
println(s"Got ${await(c.read)})
}
async {
await(c.write(5))
println("Sent")
}
What must be the case in order for this sort of thing to work? First, the channel will need to support some notification mechanism, to tell parked writers that a previously full buffer can now accommodate a write, and parked readers that a previously empty buffer now has something to read. Ignoring all sorts of complexity, the top of the class has to look a bit like:
// v1
class Chan[T](val buf: ChanBuffer[T]) {
var pReadyForWrite = Promise[Unit].success(Unit) // start out empty
var pReadyForRead = Promise[Unit] // nothing to read yet
def write(v : T) : Future[Unit] = {
val p = Promise[Unit]
pReadyForwrite.future {
buf.add(x)
pReadyForWrite = Promise[Unit]
pReadyForRead.success(Unit)
p.success(Unit)
}
p.future
???
}
When pReadyForWrite
fires, we (1) add the data,
(2) trigger pReadyForRead
, and (3) signal the parked writer.
The most significant gap in this implementation is that it won't manage
multiple readers and writers on the same channel. In fact,
when pReadyForWrite
goes off, we know that somebody will be able to
perform a write, but it might not be us. If it's not us, then we need to
schedule another attempt. And how do we know if we can write? Let's assume
that the buffer's take
and put
methods can succeed or fail, and tell
us about it the state of the buffer afterwards.
abstract class ChanBuffer[T]() {
def put(v: T) : Option[BufferResult[T]]
def take : Option[BufferResult[T]]
}
case class BufferResult[T](v : T,
noLongerEmpty:Boolean=false,
noLongerFull:Boolean=false,
nowEmpty:Boolean=false,
nowFull:Boolean=false)
The BufferResult
contains
flags rather than a simple enumeration, because combinations are possible.
For example, a put to an empty buffer of length 1 will return
BufferResult(true,false,false,true)
as the buffer is now
full and is no longer empty. If this were a dropping buffer,
which simply dropped new data when there wasn't room for it, then
nowFull
would never be set.
The BufferResult
flags will imply very specific followup
behavior from the caller:
noLongerEmpty
- CompletepReadyForRead
noLongerFull
- CompletepReadyForWrite
nowEmpty
- ReplacepReadyForRead
with an uncompleted promise.nowFull
- ReplacepReadyForWrite
with an uncompleted promise.
The more complete logic now notifies the parked writer if the write succeeds but reschedules another try if it doesn't. Roughly:
def write(v : T) : Future[Unit] = {
val p = Promise[Unit]
pReadyForwrite.future { tryWrite(v,p) }
p.future
}
private[this] def tryWrite(v: T, p: Promise[Unit]) : Unit = this.synchronized {
b.put(v).
map { br =>
if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
pNotify.success(Unit)
}
orElse
pReadyForWrite.future.map {_ => tryWrite(v,p)}
}
The corresponding read
is similar:
def read: Future[T] = {
val p = Promise[T]
pReadyForRead.future { tryRead(p) }
p.future
}
private[this] def tryRead(pNotify: Promise[T]) : Unit = this.synchronized {
b.read.
map { br =>
if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
pNotify.success(br.v)
}
orElse
pReadyForRead.future.map {_ => tryRead(pNotify)}
}
Without a tremendous amount of overhead, what we have so far handles the basic case of a single channel servicing multiple readers and writers.
Timeout
The timeout
function in core.async
simply creates a channel that delivers nil
some number of milliseconds in the future, so a delay is simply expressed by reading from
this channel. Whether you use this delay as the time limit for some operation or for some
other purpose is up to you. In Scala, the second argument of Await.result
specifies
a timeout, after which the future being awaited will fail. This is crucial functionality,
but it's not quite the same thing. To replicate the core.async
variety, it seems to
be necessary to go back to Java Timer
s. The following could stand some optimization,
but it gets the job done:
object Timeout {
val timer = new Timer()
def timeout(d: Duration): Future[Unit] = timeout(d.toMillis)
def timeout[T](d: Long, v: T): Chan[T] = {
val c = Chan[T]
val tt = new TimerTask() { def run { c.write(v) } }
timer.schedule(tt, d)
c
}
}
Alt
Unfortunately, things get more complicated when we try to implement alts!
.
The desired use case is along these lines:
val c1 = Chan[Int]
val c2 = Chan[String]
// Send integers at random intervals to c1
async {
while(true) {
val i = Random.nextInt(10)
await{c1.write(i)}
await{timeout((Random.nextInt(1000)) milliseconds).read}
}
}
// Send strings at random intervals to c2
async {
while(true) {
val s = s"I am a string: ${Random.nextInt(10)}"
await{c2.write(s)}
await{timeout((Random.nextInt(1000)) milliseconds).read}
}
}
// Listen for the first delivery from c1, c2 and a timeout.
var n = 100
async {
while(n>0) {
n=n-1
val tout = timeout(Random.nextInt(1000) milliseconds)
await(alts(c1, c2, tout)) match {
case tout(_) => println("Nothing this time.")
case c1(i) => println(s"Plus one is ${i + 1}")
case c2(s) => println(s + "score and seven")
}
}
}
The first two async
blocks are simple enough, but it isn't clear
how alts
is going to work. What initially comes to mind is
Future.firstCompletedOf(...)
which does about what it says on
the tin, but if we do the obvious thing,
await(Future.firstCompletedOf(List(c1.read,c2.read,tout.read))
we run into several problems. First, every time this line executes,
new Future
s will be created for each channel, and we will be
ignoring all but the first to complete. The other two will now vie
for delivery on their respective channels, sucking whatever they
receive into oblivion. Second, we have no way of knowing which
channel won. Solving these two problems will require writing an
function that Chan
s directly, causing a third problem: we
can't multiplex over channels of heterogeneous types. We can't even
cheat by taking Chan[Any]
(as we could have with Future[Any]
,
because Chan[T]
is T
invariant, which is has to be as it supports mutating writes.
We'll solve these problems in reverse order:
Multiplexing over heterogeneous types
To solve the last problem, we're going to do something ghastly:
type Pretender
implicit def ghastly[T](c: Chan[T]): Chan[Pretender] = c.asInstanceOf[Chan[Pretender]]
def alts(cs: Chan[Pretender]*): Future[Pretender] = ???
Now, alts
will return a Future[Pretender]
, which would
seem to be problematic, except that we're also planning on solving
problem 2, which means we'll know exactly how to cast it back, and
hopefully we can do that in a manner that makes mistakes unlikely. I
also contend that casting something to this made up class is less
horrible than casting to Any
, since we limit the number of
things it can pretend to be.
Identifying the returned channel
So that we can identify the winning channel, we're going to go back
and rewrite the internals of Chan
to deal in pairs
case class CV[T](val c: Chan[T], val v: T)
We could have used plain tuples, but a named case class makes the code a little prettier, and there will be another use for having a real class a bit later.
E.g.
private[this] def tryRead(pNotify: Promise[CV[T]]) {
b.read.map
{ br =>
if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
pNotify.success(CV(this,br.v))
}
orElse
pReadyForRead.future.map {_ => tryRead(pNotify)}
}
The regular read
method will map(_.v)
to return just the
value, since disambiguation won't be necessary, but alts
will be
able to return CV
pairs:
def alts(cs: Chan[Pretender]*): Future[CV[Pretender]] = ???
Once we can identify the channel returned by alts
, we can use that information to cast
the value to its correct T
. The trick is to write a class (rather than object) specific
unapply
method
class Chan[T] (...) {
...
def unapply(cv: CV[Chan.Pretender]): Option[T] =
if (cv.c eq this) {
Some(cv.v.asInstanceOf[T])
} else None
...
}
such that
val c1 = Chan[TheRightType]
...
case c1(v) => ...
will only match a cv:CV[Pretender]
if cv.c
refers to the
same object as c1
, and then cv.v
will be cast to
TheRightType
This is the equivalent of
case CV(`c1`,_v) => {val v = _v.asInstanceOf[TheRightType]; .... }
except without ugly back-quotes and even uglier, error-prone casts. (Again, per the philosophical rant, we do have a cast, it's just behind the curtain.)
Saving the futures
The final (first) problem is the hardest.
Instead of one promise per channel client, we want just one, to be
completed with the result from whichever channel is ready first.
We also need to be sure that, once this promise is fulfilled, the
losing channels will not attempt to fulfill it again or lose any
data as a result. Previously, this was not a risk: any given
pNotify
would only be fulfilled by one tryXXXX
lineage
(i.e. the original one, or one rescheduled with pReadyForXXXX
.
This leads us to the notion of a TentativePromise
, where an
attempt to fulfill might fail in one of two distinct ways:
- The promise was as yet uncompleted, but the buffer operation failed.
- The promise was already completed, so we didn't even attempt the buffer operation.
object OfferResult extends Enumeration {
type OfferResult = Value
val AlreadyCompleted, DidComplete, DidNotComplete = Value
}
import OfferResult._
class TentativePromise[T] {
val p = Promise[T]
def future: scala.concurrent.Future[T] = p.future
def tentativeOffer(o: => Option[T]) : OfferResult = this.synchronized {
if(!p.isCompleted) o match {
case Some(t) => {p.success(t); DidComplete}
case None => DidNotComplete
}
else AlreadyCompleted
}
}
Note that the argument of tentativeOffer
is lazy. In
pNotify.tentativeOffer(b.take)
the b.take
operation won't be attempted unless the promise is
as yet uncompleted, and the promise won't be completed if
the take
fails.
It turns out we'll need one more promisey sort of thing.
Suppose we had a channel that is used to communicate a halt from
a loop of repeated alts
:
async {
while(!halt) {
await(alts(haltChannel, stuffChannel)) match {
case stuffChannel(s) => doStuff(s)
case haltChannel(_) => halt=true
}}}
There are repeated attempts to read from the
channel, but, by definition, never more than one write.
Were we to continue scheduling with conventional Promise
s, ala
pReadyForRead.future.map {_ => tryRead(pNotify)}
we'd accumulate a future on pReadyForRead
with every iteration, and
they wouldn't be cleaned up until the program's end. What we want is some way
to keep track of the futures and clean them up when pNotify
is completed,
regardless of who completed it.
We'll use the TentativePromise
in conjunction with an IndirectPromise
,
whose purpose is to fulfill TentativePromise
s. The pReadyForXXXX
will
be IndirectPromise
s, on which a raft of TentativePromise
s may be depending,
e.g. for multiple async
blocks each competitively await
ing a read from
the same channel.
We'll use a new method, futureOffer
, which makes a future
fulfillment action contingent on fulfillment being necessary
pReadyForWrite.futureOffer(pDelivery){tryWrite(_)}}
The futureOffer
accumulates the completion functions in a map, but schedules
cleanup should the delivery promise be completed prematurely:
class IndirectPromise[T,U] extends Promise[U] {
val p = Promise[U]
type TP = TentativePromise[T]
val h: mutable.HashMap[TP, TP => Unit] = new mutable.HashMap()
def futureOffer(pDeliver : TP)(f:TP=>Unit) : Unit = this.synchronized {
if(p.isCompleted) {
f(pDeliver)
} else {
h += ((pDeliver, f))
pDeliver.future.map {_ => this.synchronized{h -= pDeliver}}
}
}
To extend the Promise
trait, you need to implement future
, isCompleted
and
tryComplete
. The first two are just delegated (so standard listeners are supported),
but the latter will loop over any remaining promise/completion pairs and attempt to run them:
def future = p.future
def isCompleted: Boolean = p.isCompleted
def tryComplete(result: scala.util.Try[U]): Boolean = this.synchronized {
if(p.tryComplete(result)) { // fires any standard listeners
h.foreach {case (pDeliver,f) => f(pDeliver) }
true
} else false}
Now, we'll rejigger tryRead
and tryWrite
to use our new promises.
class Chan[T](val b: ChanBuffer[T]) {
private [this] var pReadyForWrite = IndirectPromise.successful[CV[T],Unit](Unit)
private [this ] var pReadyForRead = IndirectPromise[CV[T],Unit]
private[this] def tryWrite(v: T, pNotify: TentativePromise[CV[T]]) : Unit = this.synchronized {
var trigger = false
pNotify.tentativeOffer(b.put(v).map { br =>
if (br.noLongerEmpty) trigger = true
if (br.nowFull) pReadyForWrite = IndirectPromise[CV[T],Unit]
CV(this,v)
}) match {
case DidNotComplete => pReadyForWrite.futureOffer(pNotify){tryWrite(v,_)}
case DidComplete => ()
case AlreadyCompleted => ()
}
if(trigger) pReadyForRead.trySuccess(Unit)
}
private[this] def tryRead(pNotify: TentativePromise[CV[T]]): Unit = this.synchronized {
var trigger = false
pNotify.tentativeOffer(b.take.map {br =>
if(br.noLongerFull) trigger = true
if(br.nowEmpty) pReadyForRead = IndirectPromise[CV[T],Unit]
CV(this,br.v)
}) match {
case DidNotComplete => pReadyForRead.future map {_ => tryRead(pNotify)}
case DidComplete => ()
case AlreadyCompleted => ()
}
if(trigger) pReadyForWrite.trySuccess(Unit)
}
...
}
The two R's
One last item is that we'd like alts
to allow both reads and writes, just as Clojure's does, e.g.
alts(c1,CV(c2,"output")) match {...}
We require another redefinition of alts
to take a new ChanHolder
trait
and implementations for both Chan
and CV
.
sealed trait ChanHolder[T] {
def chan : Chan[T]
}
case class CV[T](val c: Chan[T], val v: T) extends ChanHolder[T] {
def chan = c
}
class Chan[T](...) extends ChanHolder[T] {
def chan = this
...
}
def alts(cs: ChanHolder[Pretender]*): Future[CV[Pretender]]
alt-together now
We're finally in a position to write the new and improved Chan.alts
:
def alts(cs: ChanHolder[Pretender]*): Future[CV[Pretender]] = {
val p = Promise[CV[Pretender]]
cs.foreach { _ match {
case c : Chan[Pretender] => c.chan.read(p)
case CV(c,v) => c.chan.write(v,p)
}}
p.future
}
What's next
As previously backpedaled,
there's a bit too much if
, synchronized
and var
for comfort.
I want to continue noodling around with different chaining techniques.
With or without stylistic improvements, the code would benefit from a
concurrency torture test, which even sounds like fun.
-
Note that
core.typed
does provide a polymorphicChan
annotation, but it cannot handle heterogeneous channel types inalts!
. ↩
Comments
comments powered by Disqus