Skip to main content

Monster Zero - Revisited

This creature is capable of tremendous destruction due to it’s size, flight (with the creature’s wings also generating hurricane strength winds) and possesses several breath weapons (e.g., heat and energy).

What am I talking about here? Maybe it’s Monster Zero or King Ghidorah as it’s sometimes known. No it’s TPL Dataflow!

Yeah, yeah, I have a penchant for being over dramatic and writing quirky intros. This post is about TPL Dataflow otherwise known as TDF. I have blogged about this before in my TDF agent series but I thought it might be worth while returning to it while on the subject of monsters.

The purpose of these posts is not to put you of using these libraries but to give you feel of how they might be used from an F# viewpoint. Its not always easy to use these libraries even from C#, so jumping to another paradigm can sometimes leave you feeling bewildered, frustrated and lost.

What is TPL Dataflow #

According to MSDN here’s the description of TPL dataflow:

The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. These dataflow components are collectively referred to as the TPL Dataflow Library. This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks.

Checkout the MSDN if you want to read a more in depth outline on TDF, or you could also refer to my earlier posts too.

Sample problem #

Here’s a sample problem: We have two documents that we want to use to collate a list of word occurrences, we then want to collect the results from both documents and print out the combined results.

Although we’re using TDF to solve this problem, we could of also used F# agents, Reactive Extensions, Linq, TPL, or a mix of all of those.

We will stick to using the F# REPL for this post as it’s fairly simple example and allows for a bit of interactivity. Lets start by adding a reference, open up a few namespace’s and read a couple of text files in. In this instance we’re going to use Jane Eyre by Charlotte Bronte, and The Wendigo by Algernon Blackwood, for no reason other than they were free to download and use as samples.

#r "System.Threading.Tasks.Dataflow"
open System
open System.IO
open System.Threading.Tasks.Dataflow

let janeEyre = File.ReadAllText(@"Jane Eyre [Charlotte Bronte].txt")
let theWendigo = File.ReadAllText(@"The Wendigo [Algernon Blackwood].txt")

The next thing we need to think about is how to count the words. Let’s create a recursive function that counts each occurrence of a passed in word against the full text. The wordCount function recurses until -1 is returned from the IndexOf function. Before you argue about memory allocation, laziness etc, we’re not interested in that at the moment, we just want to solve the problem at hand. It would be fairly easy to split the input into a lazy sequence and iterate over it so we only keep a single line in memory.

let wordCount (text: String) word =
   let rec loop position count =
      match text.IndexOf(word, position, StringComparison.InvariantCultureIgnoreCase) with
      | -1 -> count
      | i ->  loop (i + word.Length) (count + 1)
   loop 0 0

Now we can start to create some TDF blocks, we shall create a BroadcastBlock first.

A BroadcastBlock provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

Messages are broadcast to all linked targets, all of which may consume a clone of the message.

The lambda expression passed into the BroadcastBlock is it’s clone function, in this case we will just use the string that is passed in: fun s -> s. We will be using the BroadcastBlock to send the same message to multiple destinations later on.

let broadcast = BroadcastBlock(fun s -> s)

Now we will create a couple of TransformBlocks.

A TransformBlock provides a dataflow block that invokes a provided Func(T, TResult) delegate for every data element received.

The TransformBlock will accept a data element and transform it by invoking it’s transform function. Here we will partially apply the wordCount function by passing in the first parameter text. This means that whenever the TransformBlock is passed data the wordCount function will already have the text parameter applied and will return the number of matches from the document. The context of passed data here will be the word that we want to find. We’ll create one for Jane Eyre and one for The Wendigo:

let transformJaneEyre = TransformBlock(wordCount janeEyre)
let transformTheWendigo  = TransformBlock(wordCount theWendigo)

Now that we have created three blocks we need to think about linking them together. You can do this with the LinkTo method that every dataflow block has. We could do that by calling the LinkTo methods as usual like this:

broadcast.LinkTo(transformJaneEyre) |> ignore
broadcast.LinkTo(transformTheWendigo) |> ignore

That seems a little awkward, especially as we’re not interested in the return parameter in this example, so instead we’re going to create a little function to make this a little bit easier for ourselves:

let (-->) source target = DataflowBlock.LinkTo(source, target) |> ignore

The LinkTo method returns an IDisposable that can be use to sever the link between the blocks that have just been joined. There are also overloads of LinkTo that allow you to specify a predicate. There is also an overload that takes a DataflowLinkOptions type which allows you to specify whether the new link is appended (Append), the maximum message that can be passed before the block is unlinked (MaxMessages), and finally and whether or not the completion of the former block is propagated to the latter (PropagateCompletion).

We can now use the --> symbolic operator and use infix notation to link the blocks together. Infix operators are expected to be placed between the two operands, which means we can define the links between the block like this:

broadcast --> transformJaneEyre
broadcast --> transformTheWendigo

Next we create a JoinBlock.

A JoinBlock provides a dataflow block that joins across multiple dataflow sources, which are not necessarily of the same type, waiting for one item to arrive for each type before they’re all released together as a tuple that contains one item per type.

let join = JoinBlock<_,_,_>()

broadcast           --> join.Target1
transformJaneEyre   --> join.Target2
transformTheWendigo --> join.Target3

We create the JoinBlock then link the broadcast, transformJaneEyre, and transformTheWendigo to it. This means that the JoinBlock will wait for data from all three blocks before sending the data on as a tuple of the three values.

Finally we create the last block which is an ActionBlock

An ActionBlock provides a dataflow block that invokes a provided Action(T) delegate for every data element received.

let writeOutput = 
    ActionBlock(fun(word:String, count1, count2) -> 
       Console.WriteLine("Word: {0}, Jane Eyre: {1}, The Wendigo: {2}", 
                         (string count1).PadLeft(3), 
                         (string count2).PadLeft(3) ) )
join --> writeOutput

Right, that completes the hook up, now all that’s left is to test it.

let words = [|"cat";"cake";"anything";"laugh";"breeze";"hysterical";"ball";"them";"home";"bird"|]
for word in words do 
  broadcast.Post(word) |> ignore

If we execute this then we we get the following output:

Word: cat       , Jane Eyre: 212, The Wendigo:  73
Word: cake      , Jane Eyre:  15, The Wendigo:   0
Word: anything  , Jane Eyre:  60, The Wendigo:  19
Word: laugh     , Jane Eyre:  68, The Wendigo:  17
Word: breeze    , Jane Eyre:  11, The Wendigo:   0
Word: hysterical, Jane Eyre:   1, The Wendigo:   1
Word: ball      , Jane Eyre:  11, The Wendigo:   1
Word: them      , Jane Eyre: 432, The Wendigo:  72
Word: home      , Jane Eyre:  90, The Wendigo:  11
Word: bird      , Jane Eyre:  35, The Wendigo:   0

Summary #

From a conceptual viewpoint of view we’re creating a BroadcastBlock which connects to two TransformBlocks. The two TransformBlocks are then connected to the JoinBlock along with the BroadcastBlock. Finally, the JoinBlock is connected to the ActionBlock.

This creates a mini network where you can simply post a message to the input of the dataflow network and it will propagate through the network. As with Reactive Extensions marble diagrams and pipeline diagrams are a great way to visualise the process flow.

I hope that sheds a little bit of light on how a dataflow network can be created with TDF. Extremely complex behaviour’s can be created by connecting up the simple dataflow building blocks, especially as the different block’s can run with multiple degrees of parallelism an also run asynchronously using Task<T>.

Until next time!