Out Fathers’ Faults – Actors and Concurrency

When I started this job and faced the joyful world of Scala and Akka I remember I was told that thanks to the Actor model you don’t have to worry about concurrency, since every issue was handled by the acting magic.

Some months later we discovered, to our dismay that this wasn’t true. Or better, it was true most of the time if you behave properly, but there are notable exceptions.

What can go wrong

There are two main ways to mess things up with actors and concurrency. The first is to let the actor wait idly for long operations in the receive function. Although innocent looking, this choice eats up a thread from the thread pool. When the pool is out of threads, actors starve and timeouts exceptions start popping like angry rain.
The second trouble happens when you access the actor state from another thread. This is the classical thread unsafety – two running agents read and write the same data unbeknownst of each other.

Messing with Threads

Actor model grants that only one thread is active at a time within the actor. In other words, there is no need to synchronize for accessing data in the actor. This is true unless you start another thread.

These days it is unlikely that you use the Thread class directly in Scala, for this very reason it is unlikely you place a Thread.run inside an actor. But… but threads are often disguised as futures and you may have plenty of futures if you deal with asynchronous API or with the ask pattern.

I can hear someone say “Hey, that’s FP, values never change, what’s wrong in two threads reading the same value without changing it?” That’s correct – FP helps because the chances of cutting the branch on which you are seated are indeed very low. But bear with me and we’ll see what can go wrong.

Messing with Futures

Let’s say your actor waits for a message, performs some time-consuming operation and then gets back to wait. The obviously wrong way to do it is:

class Example extends Actor {
    def receive=ready

    def ready: Receive = {
        case Msg(x) => write(x)     // please, no!
        case any => log.error( s"Unexpected message $any" )
    }
}

This is wrong because the actor must be just reactive, message processing must not wait for operations to complete. You can’t say how long it is going to take to write something (it may be a remote disk). Besides, there is no error handling.

The second attempt is to wrap the write into a future and let the actor fire and forget:

class Example extends Actor with Stash {
    def receive=ready

    def ready: Receive = {
        case Msg(x) => context.become( writeState(x) )
        case any => log.error( s"Unexpected message $any" )
    }

    def writeState( x : Data ) = {
        Future{ write(x) }.onComplete{
            case Success(_) => 
                unstashAll()
                context.become( ready )
            case Failure(_) => // error handler
        }

        // I know this is a bit forced, but you can easily imagine a case
        // for this to be required
        val receive : Receive = {
            case _ => stash()
        }

        receive
    }
}

Fine? Well no, not at all. Both unstashAll and context.become are called in the future thread and they operate on Actor data that may be manipulated in that very moment by the actor thread. Even if the state change is hidden behind the neat functions of the framework, they still modify data (breaking all the FP diktats).

You may be tempted (as our fathers were and couldn’t resist) to wait for the future result (Await.result), but in this case, you are back to step 1, i.e. you are inserting a batch operation in the actor having the actor thread wait for the other thread to complete.

In order to preserve the thread safety in Actors, you must ensure that extra threads, like the ones you fire with Futures, do not interact with Actor state (i.e. Actor fields). Akka pipe pattern (import akka.pattern.pipe) comes to the rescue:

import akka.pattern.pipe

class Example extends Actor with Stash {
    def receive=ready

    def ready: Receive = {
        case Msg(x) => context.become( writeState(x) )
        case any => log.error( s"Unexpected message $any" )
    }

    def writeState( x : Data ) = {
        Future{ write(x) }.pipeTo( self )

        // I know this is a bit forced, but you can easily imagine a case
        // for this to be required
        val receive : Receive = {
            case Success(_) =>
                // write complete, let's get back to ready
                unstashAll()
                context.become( ready )
            case Failure(_) => 
                // something wrong, handle error
            case _ => stash()
        }

        receive
    }
}

Now when the future is completed, its result is piped as a message to the actor itself. In this way the write receive function may handle write result in the actor thread.

So… Future, isn’t it?

Yes… almost. In some cases, our fathers found that futures could be used for actors. But then they need to collect the result from a sequence of future. In a blocking code this would be something like:

val r1 = Await.result( retrieveResult1() )
val r2 = Await.result( retrieveResult2( r1 ))
val r3 = Await.result( retrieveResult3( r2 ))

If you have to code this in a state machine, you need three states connected via the happy path, plus the failure paths, plus anything else your state machine is expected to do at the same time.

Quickly and easily this path leads to a messy code that only the most peaceful minded Jedi master can hope to get an understanding of. The proper way to deal with this is to use the monadic for comprehension:

val z = for {
    r1 <- retrieveResult1()
    r2 <- retrieveResult2( r1 )
    r3 <- retrieveResult3( r2 )
  }
  yield {
    r3
  }
z pipeTo self

Note that in this way you don’t have to worry about timeouts on each query, happy path is automatically linked and the resulting code is very close to what you would have written in blocking code (which is, by the way, the most intuitive one).

Lesson learned TANSTAAFL. If you are writing a concurrent system you need to be aware that it is a concurrent system and multiple execution threads are running at the same time in your code. And you also need to be aware that you are given a finite system, sloppiness will cause you to run out of threads, memory, disk and all the other finite resources you had.

  • Never, ever Await.result (use Future instead)
  • Double-check the functions you call in the receive function to ensure that no one blocks (I/O, synchronization operation) (use Future if needed)
  • If a Future is used in the actor use the pipe pattern to retrieve the future result
  • Make sure that context.become/stash/unstashAll are all called in the actor execution flow
  • If you find yourself in coding states to chain futures together use the for comprehension.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.