Asynchronous concurrency with vert.x – Part 2

CoffeeScript

Vert.x supports JavaScript through the Rhino JavaScript engine. Although JavaScript is a decent language once you get to know it, I prefer CoffeeScript, a language that compiles to JavaScript. Luckily, vert.x has built-in support for CoffeeScript, so I can use it nearly transparently. You will only notice the JavaScript under the hood when reading stack traces, which will refer to the compiled JavaScript file.

For the examples in this blog post, the only thing you need to know a little CoffeeScript:

foo = (a, b) -> a + b

Translates to the JavaScript code

var foo = function (a, b) {
    return a + b; // (the last statement is returned)
}

Also parentheses around function arguments are optional

  foo a, b, c
  # same as
  foo(a, b, c)

The translated source code from the example described in the last post is

vertx = require 'vertx'
address = 'example.address'
handler = (message, replier) ->
  stdout.println "sender sent " + message
  replier "pong 1", (message, replier) ->
    # and so on
vertx.eventBus.registerHandler address, handler
vertx.eventBus.send address, "ping 1", (message, replier) ->
  stdout.println "handler sent " + message
  replier "ping 2", (message, replier) ->
    # and so on

The shorter function declaration notation is a huge improvement, especially when dealing with the kind of callback-heavy code that is prevalent when dealing with asynchronous concurrency.

The Sleeping Barber Problem

To challenge vert.x with something more exciting than ping-pong, I decided to model a basic concurrency problem that mirrors some of the challenges that our new application will face – the famous Sleeping Barber Problem:

The analogy is based upon a hypothetical barber shop with one barber. The barber has one barber chair and a waiting room with a number of chairs in it. When the barber finishes cutting a customer’s hair, he dismisses the customer and then goes to the waiting room to see if there are other customers waiting. If there are, he brings one of them back to the chair and cuts his hair. If there are no other customers waiting, he returns to his chair and sleeps in it.

Each customer, when he arrives, looks to see what the barber is doing. If the barber is sleeping, then the customer wakes him up and sits in the chair. If the barber is cutting hair, then the customer goes to the waiting room. If there is a free chair in the waiting room, the customer sits in it and waits his turn. If there is no free chair, then the customer leaves. Based on a naïve analysis, the above description should ensure that the shop functions correctly, with the barber cutting the hair of anyone who arrives until there are no more customers, and then sleeping until the next customer arrives. In practice, there are a number of problems that can occur that are illustrative of general scheduling problems.

I’ve previously solved this problem using Software Transactional Memory and was interested how the message-passing style of vert.x compares.

Barber.coffee

The barber shop problem nicely separates into two systems: a barber message handler that keeps track of incoming customers and manages the queue, and set of callback methods representing the customer, which initiate a communication sequence with the message handler. The following code defines the barber message handler.

vertx = require 'vertx'
addr = 'barber'
waitTime = -> Math.random() * 100
barber = ->
  # the state of the message handler lives
  # in this closure
  busy = false
  queue = []
  freeSeats = 20
  # make the system a little indeterministic
  log = (message) ->
    stdout.println "barber: #{message}"
  # the following methods define the core behavior
  checkQueue = ->
    if queue.length > 0
      serveCustomer queue.shift()
      freeSeats += 1
      return true
    else
      return false
  serveCustomer = ({customer, replier}) ->
    log "serving #{customer}"
    busy = true
    replier 'serve', (message, replier) ->
      vertx.setTimer waitTime(), ->
        log "done serving #{customer}"
        busy = checkQueue()
        replier 'done'
  # this is the handler's callback method that
  # is being returned by the barber function
  (message, replier) ->
    customer = message
    if busy
      # there is an intermediate state where we know that we
      # have to queue the customer because there aren't any
      # free seats, but the customer must first acknowledge
      # the waiting state before we can actually put him in
      # the queue.
      if freeSeats > 0
        freeSeats -= 1
        log "sending #{customer} to queue"
        replier 'busy', (message, replier) ->
          # customer waiting ack
          queue.push {customer, replier}
          log "queued #{customer} - " +
              "length: #{queue.length} - free seats: #{freeSeats}"
      else
        replier 'full'
    else
      serveCustomer {customer, replier}
exports.start = ->
  vertx.eventBus.registerHandler addr, barber()

The state of the barber is encoded by the callback method that will be called for an upcoming event and the values of the variables defined in the closure. By being able to store repliers you can easily trigger remote state changes atomically, when they should occur.

Customer.coffee

Let’s define the behavior of the customer in a separate file

vertx = require 'vertx'
addr = 'barber'
waitTime = -> Math.random() * 100
sendCustomer = (i) ->
  # As with the barber, the customer's state is
  # defined in this closure. The variables will
  # be modified by the callback methods that are
  # triggered by the message handler's replies.
  waiting = false
  beingServed = false
  log = (message) ->
    stdout.println "customer #{i}: #{message}"
  # just a shorthand
  send = (message, callback) ->
    vertx.eventBus.send addr, message, callback
  # factor out the exit method:
  # a customer can exit after having been served
  # or when the queue is full
  exit = (message) ->
    log message + " - exiting"
    # this method doesn't send a response
    # via the replier
  getHaircut = (message, replier) ->
    waiting = false
    beingServed = true
    log "being served"
    replier 'being-served', exit
  log "enters"
  send "customer #{i}", (message, replier) ->
    switch message
      when 'busy'
        waiting = true
        log 'waiting'
        replier 'waiting', getHaircut
      when 'serve'
        getHaircut message, replier
      when 'full'
        exit message
# a loop that continuously sends customers
# to the barber
sendCustomerLoop = (i) ->
  sendCustomer i
  vertx.setTimer waitTime(), -> sendCustomerLoop i + 1
exports.start = ->
  sendCustomerLoop 1

barbershop.coffee

This time, we want to run both handler and sender in the same process, for easier testing.

barber = require 'barber'
customer = require 'customer'
barber.start()
customer.start()

Running the shop

When we start the barbershop.coffee script, we can see in the log that the shop is running as it is supposed to:

customer 1: enters
barber: serving customer 1
customer 1: being served
barber: done serving customer 1
customer 1: done - exiting
customer 2: enters
barber: serving customer 2
customer 2: being served
barber: done serving customer 2
customer 2: done - exiting
customer 3: enters
[...]

This is what the output looks like when there is no congestion at all. By chance, these customers came in just after the previous customer was served. If we wait a little longer, we can see a customer entering while the barber is busy:

barber: serving customer 3
customer 3: being served
customer 4: enters
barber: sending customer 4 to queue
customer 4: waiting
barber: queued customer 4 - length: 1 - free seats: 19
customer 5: enters
barber: sending customer 5 to queue
customer 5: waiting
barber: queued customer 5 - length: 2 - free seats: 18
barber: done serving customer 3
barber: serving customer 4
customer 3: done - exiting
customer 4: being served

As you can see, customer 4 was added to the queue and is being served right customer 3 is done. But what happens if the queue is full? Let’s set waitTime = -> Math.random() * 80 in customer.coffee so that there are a few more customers entering than leaving.

customer 34: enters
barber: sending customer 34 to queue
customer 34: waiting
barber: queued customer 34 - length: 20 - free seats: 0
customer 35: enters
customer 35: full - exiting

New customers are being turned away, as expected. The important thing is that there is no deadlocks and no invalid states, which can be easily checked by reading the log output. Knowing that there is just one callback method being executed at any point in time is a great help when reasoning about the program.

Conclusion

The central primitive is the construct replier(send_message, next_state). The replier triggers a state transition in the remote system through send_message and defines the local next_state.

If you can model your system as something similar to linked state machines, this concurrency approach is easy to implement and very powerful.