Synecducers

Great computer languages, like monarchs and planets, become emblems of what surrounds them. The greatest computer languages are barely there, as nearly everything we file under their names could be described as a library or other customization. It's not unusual and not even absurd to find a question about socket() on a C language forum: Linux is arguably a morphogenic implication of C. Clojure, too, is formally minimal, but it induces composition. Much of what we specifically admire about it isn't the language itself so much as an expression of it.

Clojure provides Communicating Sequential Processes (CSP) via the core.async library. The go construct is here implemented as a macro, yet the code that uses it is at least as elegant and natural as that written in the go language, whose inventors obviously found the concept rather central. The internet is stuffed to the gills with core.async tutorials, so I won't go into it much today except to inform choices made in a Scala version.

Goal

The goal is to come as close as possible, in Scala, to the following Clojure code,

(defn foo [n]
  (let [c1 (chan)
        c2 (chan)]
    (async/go-loop [n (dec n)]
      (>! c1 "Fizz")
      (<! (timeout (rand 500)))
      (when (pos? n) (recur (dec n))))
    (async/go-loop [n (dec n)]
      (>! c2 41)
      (<! (timeout (rand 500)))
      (when (pos? n) (recur (dec n))))
    (async/go-loop [n (dec (* 2 n))]
      (let [[v c] (async/alts! [c1 c2])]
        (condp = c
          c1 (println (str v "Buzz"))
          c2 (println (inc v))))
      (when (pos? n) (recur (dec n))))))

but with type safety1 of course. What's most important is that the channel read and write operations should not block a thread, but I would also like to avoid significant addition of boilerplate.

Philosophical Rants

Development Driven Development

If you have no idea what you're doing, start typing and see what happens. If it doesn't work, fix it. If it's unfixable, rip it apart and start over. If the emerging structure suggests interesting generalizations, consider relocating the goal post. If the requirements unavoidably imply ugliness, think about changing the requirements. It's rare that anybody knows exactly what they need.

I could have called this Refactoring Driven Development, or Prototype Driven Development, but that wouldn't have been sarcastic enough to to evoke and implicitly deride Test Driven Development, whose synergies of witless sloganeering and robotic compliance promise not just general dystopia, but the very specific dystopia of the former Soviet Union.

Curtain Driven Development

When writing a library or framework that you expect to be used in diverse ways, it's easy to get too hung up on the dire prohibitions of our age:

  1. Mutability
  2. Locks
  3. Explicit type checks and casts

The evil of these constructs lies in their potential to create code that is difficult to understand ("reason about") and for that reason may hide tricky bugs. In fact, you should probably assume that there will be bugs, and bugs are bad in inverse proportion to the amount of time you're willing to spend on eradication and the proving of eradication. That amount of time should, in turn, be proportional to the aggregate time that the code will eventually run. In an application written for a relatively narrow purpose, you should be willing to sacrifice flexibility and even performance to avoid spending time fixing things.

On the other hand, the benefit of these constructs lies in their proximity to the hardware and the freedom to tell that hardware what to do more more or less directly. The real world is procedural, mutable and unityped, and, when coding in the real world, the only decision you can make is at where in the abstraction stack you choose to begin hiding that fact. Beyond that point, code should pay no attention to the infernal machine behind the curtain, but that doesn't diminish our need for infernal machines.

Backpedaling

To be honest, I was hoping that there would be a lot less curtain-worthy action than ultimately appeared necessary. It had seemed at least plausible to address the various asynchronous dependencies and contingencies monadically, with vanilla promises and futures, but this proved to be beyond me, especially in the implementation of alts. For the moment, I believe that one needs extra machinery when there are potentially both many writers and many readers on the same channel - hence, perhaps, the rather old-school concurrency techniques uses in the core.async ManyToManyChannel. Or I'm missing something.

I should also emphasize that this code is not nearly well tested enough to be offered for production use. There's a difference between eschewing test-driven development and embracing inadequate testing, but I don't have a lot of time on my hands at the moment... Appropriately, this code lives for the moment in my scala-playground repository.

Asynchronicity in Scala

There isn't a standard implementation of CSP for Scala, but there are quite a few related tools and abstractions for concurrent programming. This is a non-exhaustive list of concepts in the space.

Futures and Promises

The simplest way of making something happen in the future in Scala is with a Future,

  val f = Future {Thread.sleep(100);2}

where Thread.sleep(100) is a stand-in for some more reasonable time-consuming activity. So far, that's not much different from creating a Java Runnable, but the most stupendous thing about Scala's futures is that they're functors,

  val f2 = f.map {_+1}

so you can map arbitrary chains of what are essentially callbacks, specifying them sequentially, rather than undergoing the eponymous hell. Actually, they're monads too, with flatMap, so you can use the full for sugar to coordinate multiple events:

val f3 = Future {Thread.sleep(1000); 3.14}
val f4 = for {i <- f2
              d <- f3}
       yield (d.round==i)

Generally one wants to avoid blocking on futures, but you can:

  assert( Await.result(f2, 1 second)==4)  // blocking

There are other interesting features, but that's the gist of it.

We also get Promises, which are really souped up CountdownLatches that you use to trigger activity somewhere else, generally via a future:

val p = Promise[Int]
p.future.onSuccess {case i => println(s"The answer is $i")}
p.success(42)

A single promise can produce an arbitrary number of futures, all of which will fire when the promise is fulfilled.

Actors

The standard suggestion for event driven programming in Scala these days seems to be Akka's Actor library. Akka has a lot to offer, but, for present purposes, we should note that the actor model is not CSP; in some ways, it's the opposite of CSP. Programming with CSP style channels is akin to using higher order functions with collections; it's a variation of a familiar FP paradigm, where the central entity is a conduit of data, and behavior is encapsulated in pure, transport-oblivious functions. Programming with actors is, essentially, writing miniature servers; it's a variation of a familiar OO paradigm, where the central entity is an object with customizable behaviors:

class MyActor extends Actor {
  override def receive = {
    case x:Int => doSomething(x)
    case _     => doSomethingElse()
  }
}

Async

As we've seen, the monadic future allows us to compose asynchronous events in a manner that is much more intuitive than straight callbacks. The Async Project takes this further, with macros that enable an even more intuitive and efficient organization of futures. In the example above, we can replace the for comprehension with:

async {
  val i = await(f2)
  val d = await(f3)
  d.round==i
}

or even

async {await(f2) == await(f3).round}

Notwithstanding its misleading name, await doesn't actually wait. Rather, the state of execution is "parked" as it is for various ! operations inside Clojure's go blocks.

With async we're well on our way to core.async, but we still have to bridge the divide between futures and channels.

scala-gopher

Gopher is the most fully fledged CSP framework for Scala that I've been able to find. It may be the wave of the future, but (again) for the present (and contrived) purposes, I will find things to object to. First, the author writes

Note, which this is not an emulation of go language constructions in scala, but rather reimplementation of key ideas in 'scala-like' manner.

but what I'm looking for is in fact an emulation of Clojure language constructions, and, with the use of async, I think I can achieve that in a manner that is sufficiently Scala-like as at least to be legible. Second, while Gopher is built with Async components, it introduces its own go macro, which I'm hoping isn't necessary, since it would be nice to mix futures and channel programming.

Akka Channels

The Akka Channel class has been deprecated in favor of the akka.persistence.AtLeastOnceDelivery trait, which makes it somewhat more obvious that its purpose is reliable delivery rather than CSP. In retrospect, it seems that one shouldn't ever use the word "channel" except to provide opportunities for going on about what we mean by the word.

Channels for Scala

Like Clojure's channel, ours is essentially a buffer with asynchronous access methods. Unlike Clojure's, ours will handle those asynchronous operations via futures and promises, so we can re-use the machinery and idioms for for dealing with them. We should be able to do this:

val c = Chan[Int]
async {
  println(s"Got ${await(c.read)})
}
async {
  await(c.write(5))
  println("Sent")
}

What must be the case in order for this sort of thing to work? First, the channel will need to support some notification mechanism, to tell parked writers that a previously full buffer can now accommodate a write, and parked readers that a previously empty buffer now has something to read. Ignoring all sorts of complexity, the top of the class has to look a bit like:

// v1
class Chan[T](val buf: ChanBuffer[T]) {
  var pReadyForWrite = Promise[Unit].success(Unit) // start out empty
  var pReadyForRead =  Promise[Unit] // nothing to read yet

  def write(v : T) : Future[Unit] = {
    val p = Promise[Unit]
    pReadyForwrite.future {
       buf.add(x)
       pReadyForWrite = Promise[Unit]
       pReadyForRead.success(Unit)
       p.success(Unit)
    }
    p.future

  ???
}

When pReadyForWrite fires, we (1) add the data, (2) trigger pReadyForRead, and (3) signal the parked writer.

The most significant gap in this implementation is that it won't manage multiple readers and writers on the same channel. In fact, when pReadyForWrite goes off, we know that somebody will be able to perform a write, but it might not be us. If it's not us, then we need to schedule another attempt. And how do we know if we can write? Let's assume that the buffer's take and put methods can succeed or fail, and tell us about it the state of the buffer afterwards.

  abstract class ChanBuffer[T]() {
    def put(v: T) : Option[BufferResult[T]]
    def take : Option[BufferResult[T]]
  }
  case class BufferResult[T](v : T,
        noLongerEmpty:Boolean=false,
        noLongerFull:Boolean=false,
        nowEmpty:Boolean=false,
        nowFull:Boolean=false)

The BufferResult contains flags rather than a simple enumeration, because combinations are possible. For example, a put to an empty buffer of length 1 will return BufferResult(true,false,false,true) as the buffer is now full and is no longer empty. If this were a dropping buffer, which simply dropped new data when there wasn't room for it, then nowFull would never be set.

The BufferResult flags will imply very specific followup behavior from the caller:

  1. noLongerEmpty - Complete pReadyForRead
  2. noLongerFull - Complete pReadyForWrite
  3. nowEmpty - Replace pReadyForRead with an uncompleted promise.
  4. nowFull - Replace pReadyForWrite with an uncompleted promise.

The more complete logic now notifies the parked writer if the write succeeds but reschedules another try if it doesn't. Roughly:

  def write(v : T) : Future[Unit] = {
    val p = Promise[Unit]
    pReadyForwrite.future { tryWrite(v,p) }
    p.future
  }
  private[this] def tryWrite(v: T, p: Promise[Unit]) : Unit = this.synchronized {
    b.put(v).
     map { br =>
           if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
           if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
           pNotify.success(Unit)
         }
     orElse
       pReadyForWrite.future.map {_ => tryWrite(v,p)}
  }

The corresponding read is similar:

  def read: Future[T] = {
    val p = Promise[T]
    pReadyForRead.future { tryRead(p) }
    p.future
  }
  private[this] def tryRead(pNotify: Promise[T]) : Unit = this.synchronized {
    b.read.
     map { br =>
           if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
           if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
           pNotify.success(br.v)
         }
     orElse
       pReadyForRead.future.map {_ => tryRead(pNotify)}
  }

Without a tremendous amount of overhead, what we have so far handles the basic case of a single channel servicing multiple readers and writers.

Timeout

The timeout function in core.async simply creates a channel that delivers nil some number of milliseconds in the future, so a delay is simply expressed by reading from this channel. Whether you use this delay as the time limit for some operation or for some other purpose is up to you. In Scala, the second argument of Await.result specifies a timeout, after which the future being awaited will fail. This is crucial functionality, but it's not quite the same thing. To replicate the core.async variety, it seems to be necessary to go back to Java Timers. The following could stand some optimization, but it gets the job done:

  object Timeout {
    val timer = new Timer()
    def timeout(d: Duration): Future[Unit] = timeout(d.toMillis)
    def timeout[T](d: Long, v: T): Chan[T] = {
      val c = Chan[T]
      val tt = new TimerTask() { def run { c.write(v) }  }
      timer.schedule(tt, d)
      c
    }
  }

Alt

Unfortunately, things get more complicated when we try to implement alts!. The desired use case is along these lines:

    val c1 = Chan[Int]
    val c2 = Chan[String]

  // Send integers at random intervals to c1
  async {
      while(true) {
        val i = Random.nextInt(10)
        await{c1.write(i)}
        await{timeout((Random.nextInt(1000)) milliseconds).read}
      }
  }
  // Send strings at random intervals to c2
  async {
      while(true) {
        val s = s"I am a string: ${Random.nextInt(10)}"
        await{c2.write(s)}
        await{timeout((Random.nextInt(1000)) milliseconds).read}
      }
  }
  // Listen for the first delivery from c1, c2 and a timeout.
  var n = 100
  async {
      while(n>0) {
        n=n-1
        val tout = timeout(Random.nextInt(1000) milliseconds)
        await(alts(c1, c2, tout)) match {
          case tout(_) => println("Nothing this time.")
          case c1(i) => println(s"Plus one is ${i + 1}")
          case c2(s) => println(s + "score and seven")
        }
      }
    }

The first two async blocks are simple enough, but it isn't clear how alts is going to work. What initially comes to mind is Future.firstCompletedOf(...) which does about what it says on the tin, but if we do the obvious thing,

   await(Future.firstCompletedOf(List(c1.read,c2.read,tout.read))

we run into several problems. First, every time this line executes, new Futures will be created for each channel, and we will be ignoring all but the first to complete. The other two will now vie for delivery on their respective channels, sucking whatever they receive into oblivion. Second, we have no way of knowing which channel won. Solving these two problems will require writing an function that Chans directly, causing a third problem: we can't multiplex over channels of heterogeneous types. We can't even cheat by taking Chan[Any] (as we could have with Future[Any], because Chan[T] is T invariant, which is has to be as it supports mutating writes.

We'll solve these problems in reverse order:

Multiplexing over heterogeneous types

To solve the last problem, we're going to do something ghastly:

    type Pretender
    implicit def ghastly[T](c: Chan[T]): Chan[Pretender] = c.asInstanceOf[Chan[Pretender]]
    def alts(cs: Chan[Pretender]*): Future[Pretender] = ???

Now, alts will return a Future[Pretender], which would seem to be problematic, except that we're also planning on solving problem 2, which means we'll know exactly how to cast it back, and hopefully we can do that in a manner that makes mistakes unlikely. I also contend that casting something to this made up class is less horrible than casting to Any, since we limit the number of things it can pretend to be.

Identifying the returned channel

So that we can identify the winning channel, we're going to go back and rewrite the internals of Chan to deal in pairs

  case class CV[T](val c: Chan[T], val v: T)

We could have used plain tuples, but a named case class makes the code a little prettier, and there will be another use for having a real class a bit later.

E.g.

  private[this] def tryRead(pNotify: Promise[CV[T]]) {
    b.read.map
    { br =>
      if(br.noLongerEmpty) pReadyForRead.trySuccess(Unit)
      if(br.noLongerEmpty) pReadyForWrite = Promise[Unit]
      pNotify.success(CV(this,br.v))
    }
    orElse
     pReadyForRead.future.map {_ => tryRead(pNotify)}
  }

The regular read method will map(_.v) to return just the value, since disambiguation won't be necessary, but alts will be able to return CV pairs:

    def alts(cs: Chan[Pretender]*): Future[CV[Pretender]] = ???

Once we can identify the channel returned by alts, we can use that information to cast the value to its correct T. The trick is to write a class (rather than object) specific unapply method

 class Chan[T] (...) { 
    ...
    def unapply(cv: CV[Chan.Pretender]): Option[T] =
      if (cv.c eq this) {
        Some(cv.v.asInstanceOf[T])
      } else None
    ...
}

such that

   val c1 = Chan[TheRightType]
   ...
     case c1(v) => ...

will only match a cv:CV[Pretender] if cv.c refers to the same object as c1, and then cv.v will be cast to TheRightType This is the equivalent of

   case CV(`c1`,_v) => {val v = _v.asInstanceOf[TheRightType]; .... }

except without ugly back-quotes and even uglier, error-prone casts. (Again, per the philosophical rant, we do have a cast, it's just behind the curtain.)

Saving the futures

The final (first) problem is the hardest.

Instead of one promise per channel client, we want just one, to be completed with the result from whichever channel is ready first. We also need to be sure that, once this promise is fulfilled, the losing channels will not attempt to fulfill it again or lose any data as a result. Previously, this was not a risk: any given pNotify would only be fulfilled by one tryXXXX lineage (i.e. the original one, or one rescheduled with pReadyForXXXX.

This leads us to the notion of a TentativePromise, where an attempt to fulfill might fail in one of two distinct ways:

  1. The promise was as yet uncompleted, but the buffer operation failed.
  2. The promise was already completed, so we didn't even attempt the buffer operation.
  object OfferResult extends Enumeration {
    type OfferResult = Value
    val AlreadyCompleted, DidComplete, DidNotComplete = Value
  }
  import OfferResult._
  class TentativePromise[T] {
    val p = Promise[T]
    def future: scala.concurrent.Future[T] = p.future
    def tentativeOffer(o: => Option[T]) :  OfferResult = this.synchronized {
      if(!p.isCompleted) o match {
        case Some(t) => {p.success(t); DidComplete}
        case None    => DidNotComplete
      }
      else AlreadyCompleted
     }
   }

Note that the argument of tentativeOffer is lazy. In

  pNotify.tentativeOffer(b.take)

the b.take operation won't be attempted unless the promise is as yet uncompleted, and the promise won't be completed if the take fails.

It turns out we'll need one more promisey sort of thing. Suppose we had a channel that is used to communicate a halt from a loop of repeated alts:

   async {
    while(!halt) {
      await(alts(haltChannel, stuffChannel)) match {
        case stuffChannel(s) => doStuff(s)
        case haltChannel(_) => halt=true
      }}}

There are repeated attempts to read from the channel, but, by definition, never more than one write. Were we to continue scheduling with conventional Promises, ala

   pReadyForRead.future.map {_ => tryRead(pNotify)}

we'd accumulate a future on pReadyForRead with every iteration, and they wouldn't be cleaned up until the program's end. What we want is some way to keep track of the futures and clean them up when pNotify is completed, regardless of who completed it.

We'll use the TentativePromise in conjunction with an IndirectPromise, whose purpose is to fulfill TentativePromises. The pReadyForXXXX will be IndirectPromises, on which a raft of TentativePromises may be depending, e.g. for multiple async blocks each competitively awaiting a read from the same channel. We'll use a new method, futureOffer, which makes a future fulfillment action contingent on fulfillment being necessary

   pReadyForWrite.futureOffer(pDelivery){tryWrite(_)}}

The futureOffer accumulates the completion functions in a map, but schedules cleanup should the delivery promise be completed prematurely:

  class IndirectPromise[T,U] extends Promise[U] {
    val p = Promise[U]
    type TP = TentativePromise[T]
    val h: mutable.HashMap[TP, TP => Unit] = new mutable.HashMap()
    def futureOffer(pDeliver : TP)(f:TP=>Unit) : Unit = this.synchronized {
      if(p.isCompleted) {
        f(pDeliver)
      } else {
        h += ((pDeliver, f))
        pDeliver.future.map {_ => this.synchronized{h -= pDeliver}}
      }
    }

To extend the Promise trait, you need to implement future, isCompleted and tryComplete. The first two are just delegated (so standard listeners are supported), but the latter will loop over any remaining promise/completion pairs and attempt to run them:

    def future  = p.future
    def isCompleted: Boolean = p.isCompleted
    def tryComplete(result: scala.util.Try[U]): Boolean = this.synchronized {
       if(p.tryComplete(result)) {  // fires any standard listeners
          h.foreach {case (pDeliver,f) => f(pDeliver) }
          true
       } else false}

Now, we'll rejigger tryRead and tryWrite to use our new promises.

 class Chan[T](val b: ChanBuffer[T]) {
    private [this] var pReadyForWrite =   IndirectPromise.successful[CV[T],Unit](Unit)
    private [this ] var pReadyForRead =   IndirectPromise[CV[T],Unit]

    private[this] def tryWrite(v: T, pNotify: TentativePromise[CV[T]]) : Unit = this.synchronized {
      var trigger = false
      pNotify.tentativeOffer(b.put(v).map { br => 
         if (br.noLongerEmpty) trigger = true
         if (br.nowFull)       pReadyForWrite = IndirectPromise[CV[T],Unit]
         CV(this,v)
      }) match {
        case DidNotComplete => pReadyForWrite.futureOffer(pNotify){tryWrite(v,_)}
        case DidComplete =>      ()
        case AlreadyCompleted => ()
      }
      if(trigger) pReadyForRead.trySuccess(Unit)
    }

    private[this] def tryRead(pNotify: TentativePromise[CV[T]]): Unit = this.synchronized {
      var trigger = false
      pNotify.tentativeOffer(b.take.map {br =>
         if(br.noLongerFull) trigger = true
         if(br.nowEmpty)     pReadyForRead = IndirectPromise[CV[T],Unit]
         CV(this,br.v)
      }) match {
        case DidNotComplete => pReadyForRead.future map {_ => tryRead(pNotify)}
        case DidComplete       => ()
        case AlreadyCompleted  => ()
      }
      if(trigger) pReadyForWrite.trySuccess(Unit)
    }
  ...
  }

The two R's

One last item is that we'd like alts to allow both reads and writes, just as Clojure's does, e.g.

   alts(c1,CV(c2,"output")) match {...}

We require another redefinition of alts to take a new ChanHolder trait and implementations for both Chan and CV.

  sealed trait ChanHolder[T] {
    def chan : Chan[T]
  }

  case class CV[T](val c: Chan[T], val v: T) extends ChanHolder[T] {
    def chan = c
  }

  class Chan[T](...) extends ChanHolder[T] {
    def chan = this
    ...
  }

  def alts(cs: ChanHolder[Pretender]*): Future[CV[Pretender]]

alt-together now

We're finally in a position to write the new and improved Chan.alts:

    def alts(cs: ChanHolder[Pretender]*): Future[CV[Pretender]] = {
      val p = Promise[CV[Pretender]]
      cs.foreach { _ match {
        case c  : Chan[Pretender] => c.chan.read(p)
        case CV(c,v) => c.chan.write(v,p)
      }}

      p.future
    }

What's next

As previously backpedaled, there's a bit too much if, synchronized and var for comfort. I want to continue noodling around with different chaining techniques. With or without stylistic improvements, the code would benefit from a concurrency torture test, which even sounds like fun.


  1. Note that core.typed does provide a polymorphic Chan annotation, but it cannot handle heterogeneous channel types in alts!



Comments

comments powered by Disqus