The Lurking Horror
Deep in the darkest depths lurks an ancient horror, when the time is right it will rise forth and leave you screaming for mercy and begging for forgiveness…
OK, I have a penchant for being over dramatic but in this post I am going to reveal some little known caveats in a well known and much revelled area of F#, agents aka the MailboxProcessor
. Gasp!
First let me give you a demonstration:
open System
open System.Diagnostics
type internal BadAgentMessage =
| Message of string * int
| Lock
| Unlock
type BadAgent() =
let agent = MailboxProcessor.Start(fun agent ->
let sw = Stopwatch()
let rec waiting () =
agent.Scan(function
| Unlock -> Some(working ())
| _ -> None)
and working() = async {
let! msg = agent.Receive()
match msg with
| Lock -> return! waiting()
| Unlock -> return! working()
| Message (msg, iter) ->
if iter = 0 then sw.Start()
if iter % 10000 = 0
then sw.Stop()
printfn "%s : %i in: %fms" msg iter sw.Elapsed.TotalMilliseconds
sw.Restart()
return! working() }
working())
member x.Msg(msg) = agent.Post(Message msg)
member x.Lock() = agent.Post(Lock)
member x.Unlock() = agent.Post(Unlock)
The BadAgentMessage
type defines a discriminated union that we are going to use for the agents message interface. This is comprised of three elements:
- Message: This will just be a simple
string
-based message and anint
used as a counter. - Lock: This is used to stop message processing within the agent by causing it to wait for an
Unlock
message to arrive. - Unlock: This message is used to resume the processing within the agent, effectively exiting the locked state.
We have two main sections to the agents body which I will describe below.
working #
The purpose of the working
function is to dequeue the messages from the agent and process them with pattern matching; let! msg = agent.Receive()
is used to get the next message which is then pattern matched to be one of the three messages types of the BadAgentMessage
. When the Lock
message is encountered return! waiting()
is used to place the agent in a state where it is waiting for an Unlock
message to arrive. An Unlock
message simply resumes processing by calling return! working()
. The only real purpose of the Unlock
message is to exit from the locked state that is introduced by the Lock
message. The Message
message simply starts a StopWatch
on the first operation by using the Messages counter, and then stops it again on the 10,000th operation. At this point the time taken is also printed to the console and the StopWatch
is restarted before resuming the main processing loop by calling return! working()
waiting #
This function is using the agents Scan
function to wait for an Unlock
message to arrive, once it does it puts the agent back into normal operation by calling returning Some(working())
from the Scan
function. If the message does not match an Unlock
message then None
is returned and the agent simply waits for the next message before trying again.
The rest of the agent is just ancillary member functions to allow easy sending of the three message types.
Test Harness #
And here’s a very simple test harness:
let ba = BadAgent()
printfn "Press and key to start"
Console.ReadLine() |> ignore
let dump number =
for i in 0 .. number do
ba.Msg("A message", i)
ta.Lock()
dump 200000
ta.Unlock()
Console.ReadLine() |> ignore
OK, so this is a very synthetic test but I just wanted to highlight some of the internal behaviour. If I run this code I get the following console output:

You can see that the time to process the first 10,000 messages is 3083ms then it steadily decreases until the last 10,000 messages are processed in 94ms. The processing time for 10,000 messages is about 33 times slower at the beginning than as it is at the end. Why?
Opening it up #
Let’s take a look at some of the internals of the MailboxProcessor
to understand what’s going on. First of all the core functionality is actually contained within the Mailbox
type with the MailboxProcessor
acting as an augmenter. TryPostAndReply
, PostAndReply
, PostAndTryAsyncReply
, and PostAndAsyncReply
all add a single functionality to the Mailbox
type; the ability to synchronously or asynchronously reply to a message once it arrives. TryPostAndReply
and PostAndReply
both wait synchronously for a message to arrive before replying, whereas PostAndTryAsyncReply
and PostAndAsyncReply
both reply asynchronously. This functionality is achieved with the use of the ResultCell
and AsyncReplyChannel
types. For an in-depth discussion on this you might want to refer to my earlier series which describes implementing the MailboxProcessor
with TPL Dataflow (see Part 1, Part 2 and Part 3).
Below are some snippets of code from the Mailbox
type you might want to take a peek yourself at the FSharp repository over at Github for a closer inspection, be warned thought there is a lot of code in there!
Here’s the initial type definition for the Mailbox
, you can see that there are two mutable fields:
type Mailbox<'Msg>() =
let mutable inboxStore = null
let mutable arrivals = new Queue<'Msg>()
inboxStore
is a generic List type System.Collection.Generic.List<T>
and arrivals
is a System.Collections.Generic.Queue<T>
type.
For now the inboxStore
is null and is only ever assigned via Scan
or TryScan
and this is done indirectly via the inbox
member shown here:
member x.inbox =
match inboxStore with
| null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1) // ResizeArray
| _ -> ()
inboxStore
Understanding the code in the Mailbox
can be difficult given the amount of code, so I’ll highlight the key functions in the sections below to make it a little easier.
Scan / TryScan #
Scan
is just an async wrapper around TryScan
. If TryScan
returns None an exception is raised, if not then the result from TryScan
is returned.
So now lets take a look at the source of TryScan
.
member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> =
let rec scan() =
async { match x.scanArrivals(f) with
| None -> // Deschedule and wait for a message. When it comes, rescan the arrivals
let! ok = waitOne(timeout)
if ok then return! scan() else return None
| Some resP -> let! res = resP
return Some(res) }
// Look in the inbox first
async { match x.scanInbox(f,0) with
| None -> return! scan()
| Some resP -> let! res = resP
return Some(res) }
You can see here that an async workflow is declared that first pattern matches on x.scanInbox
, passing in the predicate scan function f
and the literal 0
. If None
is returned then there is no match and the recursive function scan
is returned. This time the function x.scanArrivals
is be called, again passing in the predicate function f
.
- An interesting point to note, is that each message that arrives that doesn’t match the predicate
f
resets the timer:let! ok = waitOne(timeout)
, this means that any number of trivial messages that arrive keep theTryScan
function running. This was also mentioned by Jon Harrop in a Stackoverflow question entitled How to use TryScan in F# properly. Jon also mentions locking which I will address in thescanArrivals
section below.
So what’s the difference between scanArrivals
and scanInbox
?
scanInbox
operates on the inboxStore
which you might recall is a List<T>
type, whereas scanArrivals
operates on arrivals
which is a Queue<T>
type. The big difference between these two is that as messages first arrive in the Mailbox they end up in the arrivals queue first, and when messages are not matched by the predicate function f
they are added to the inboxStore
, hence the need to always check the inboxStore
before the arrivals
queue otherwise previously unmatched scan messages would not be processed correctly. You might be asking yourself why not use a Queue<T>
for both the inbox
and the arrivals
? It comes down to the fact that it’s not possible to easily use a Queue<T>
for arrivals
because of the way that Scan works. At any point in the queue there could do a potential match so each item would have to be dequeued and processed separately, an indexed List<T>
type is the best fit for this situation.
scanArrivals / scanArrivalsUnsafe #
Lets look at the scanArrivals
function, it’s just a lock construct around the scanArrivals
function. This leads to an important point, the scan function is operating under a lock, which effectively means that end user code is also executed under the lock and if you hold onto the lock for any length of time then there will be significant blocking of the normal receive mechanism due to it also using the same lock when receiving.
member x.scanArrivalsUnsafe(f) =
if arrivals.Count = 0 then None
else let msg = arrivals.Dequeue()
match f msg with
| None ->
x.inbox.Add(msg);
x.scanArrivalsUnsafe(f)
| res -> res
// Lock the arrivals queue while we scan that
member x.scanArrivals(f) = lock syncRoot (fun () -> x.scanArrivalsUnsafe(f))
If we pause for a second and review the MailBoxProcessor
documentation on MSDN:
For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active.
Obeying this rule should ensure that no deadlock situations will arise but lock contentions can still arise as messages will still be being posted to the mailbox, which will in turn attempt to acquire the same syncRoot
lock.
Lets move onto the next function, I have saved this one for last as its the most interesting.
scanInbox #
A quick glance at scanInbox
reveals another function which, to my eye, could have heavy-weight performance implications. The inbox
is a List<T>
type, and the RemoveAt
function does an internal Array.Copy
for each removal. This is an O(n) operation where n is (Count - index), so as soon as the list gets to a reasonable size then this then is going to really start chewing into your processing time.
member x.scanInbox(f,n) =
match inboxStore with
| null -> None
| inbox ->
if n >= inbox.Count
then None
else
let msg = inbox.[n]
match f msg with
| None -> x.scanInbox (f,n+1)
| res -> inbox.RemoveAt(n); res
In order to check this theory lets do some quick profiling of the console test that we showed earlier:

This screen shot was taken using Jet Brains DotTrace 5.1. This is one of my favourite performance profilers because it captures results to line level and maps back to the F# source code relatively easily.
Yeah there it is, a whopping 44.41% of the time is spent in RemoveAt
. Also notice that there were 200,000 calls which mirrors the number we placed in the queue before using the Lock/Unlock message types.
One of the things that really stands out for me is that the inbox
is a simple list and completely unbounded. In a high throughput situation where the scan function is being used it’s perfectly feasible to get into a runaway memory or CPU condition where the unmatched messages are sitting in the inbox
taking longer and longer to processes due to the O(n) operation that takes place in the RemoveAt
function. Given a consistent throughput then eventually you are going to either run out memory, or the processing time will make throughput drop to dire levels which in turn will back up the inbox
even further, effectively this is a death spiral.
Conclusion #
So what conclusion can we draw from all of this?
- Firstly be careful with usage of
Scan
andTryScan
, in certain situations the internal queue could back up to a certain size where you will be constantly struggling against the O(n) operation cost. - Agents are not a silver bullet solution. They cannot solve every problem. Although it’s possible to use agent based techniques to solve various problems like blocking collections and such like, you have to use care and diligence in the solution to avoid introducing another problems into the mix. I have seen several implementations that I have been able to break relatively easily.
- Do I still use agents? Absolutely! Agents are a fabulous tool to have in our toolbox and some extremely elegant solution exist to solve very complex problems.
- Do I use
Scan
orTryScan
? Not in its current form in theMailboxProcessor
. I chose to implement a destructive scan in my TDF agent for the reasons discussed here.
Before we finish, I’d like to briefly cover TryScan
from my TDF based agent to complete the picture.
Destructive TryScan #
member x.TryScan((scanner: 'Msg -> Async<_> option), timeout): Async<_ option> =
let ts = TimeSpan.FromMilliseconds(float timeout)
let rec loopForMsg = async {
let! msg = Async.AwaitTask <| incomingMessages.ReceiveAsync(ts)
.ContinueWith(fun (tt:Task<_>) ->
if tt.IsCanceled || tt.IsFaulted then None
else Some tt.Result)
match msg with
| Some m -> let res = scanner m
match res with
| None -> return! loopForMsg
| Some res -> return! res
| None -> return None}
loopForMsg
A message is dequeued on the line 4 with let! msg = Async.AwaitTask ...
. This is then processed by the pattern matching expression on line 9 | Some m -> let res = scanner m
. If the result of the scanner function results in None
being returned then the message is discarded and the next operation continues with another call to loopForMsg
, otherwise the message is returned with | Some res -> return! res
.
One of the areas where I have a lot of experience is using pipelined operations based on input from network I/O. One of the things that always causes a problem is unbounded situations such as having a queue with no absolute limit. There comes a time when you have to protect yourself from what is effective a denial of service, you have to either destructively terminate messages or connections or route the overflowed data for processing later.
Until next time…