Skip to main content

Pipeline processing 3

Ok so I have been offline for a while now, what with starting a new financial contract in London and not having any broadband access for a while.  I have been working on something, honest!

Since the last post I have been reflecting on the pipeline design and it had a distinct object orientated feel to it that I wasnt happy with, so I have amended the structure of the code and come up with the following which simplifies in some areas and expands in others…

module Pipeline
  open System.Collections.Concurrent

  type IPipelineInput<'a> =
    abstract Insert: 'a -> unit
  type IPipelineConnection<'a> =
    abstract Attach: IPipelineInput<'a> -> unit
    abstract Detach: IPipelineInput<'a> -> unit
  type IPipeline<'a,'b> =
    inherit IPipelineConnection<'b>
    inherit IPipelineInput<'a>

  type PipelineStage<'a,'b>(processor, router: seq<IPipelineInput<'b>> * 'b -> seq<IPipelineInput<'b>>, ?overflow, ?capacity, ?blockingTime) =
    let processor = processor
    let router = router  
    let createBlockingCollection x =
        match x with
        | Some c -> new BlockingCollection<'a>(c:int)
        | None -> new BlockingCollection<'a>()  
    let buffer = createBlockingCollection capacity
    let routes = ref List.empty<IPipelineInput<'b>>
    let queuedOrRunning = ref false  
    let blocktime =
      match blockingTime with
      | Some b -> b
      | None -> 250  
    let consumerLoop = async {
        let rec loop()=
          let item = ref Unchecked.defaultof<_>
          let taken = buffer.TryTake(item, blocktime)
          if taken then
              do !item
              |> processor
              |> Seq.iter (fun z ->
              (match !routes with
               | [] -> ()(*we cant route with no routes*)
               | _ -> do router (!routes, z) |> Seq.iter (fun r -> (r.Insert z ))) )
          else ()(*exit nothing to consume in time limit*)
      with e -> raise e
    member this.ClearRoutes = routes := []  
    interface IPipelineInput<'a> with
      member this.Insert payload =
        let added = buffer.TryAdd(payload, blocktime)
        if added then
          //begin consumer loop
          if not !queuedOrRunning then
            lock consumerLoop (fun() ->
            Async.Start(async {do! consumerLoop })
            queuedOrRunning := true)
          //overflow here if function passed
          match overflow with
          | Some t ->  payload |> overflow.Value
          | None -> ()  
    interface IPipelineConnection<'b> with
      member this.Attach (stage) =
        let current = !routes
        routes := stage :: current  
      member this.Detach (stage) =
        let current = !routes
        routes := List.filter (fun el -> el <> stage) current  
    static member Attach (a:IPipelineConnection<_>) (b) =
      a.Attach b ;b  
    static member Detach (a: IPipelineConnection<_>) (b) =
      a.Detach b ;a  
    static member (++>) (a:IPipelineConnection<_>, b) =
      a.Attach (b) ;b  
    static member (-->) (a:IPipelineConnection<_>, b) =
      a.Detach b ;a  
    static member (<<--) (a:IPipelineInput<_>, b:'b) =
      a.Insert b  
    static member (-->>) (b,a:IPipelineInput<_>) =
      a.Insert b

Summary. #

I only want to summarise the code as I think its fairly straight forward to see whats going on.

Interfaces #

We have two main interfaces defined IPipelineInput<‘a> and **IPipelineConnection<‘a>, **as you can tell by the names they are involved with connecting the pipeline together and getting information into the pipeline.  Those two interfaces are merged together in the IPipeline<‘a, ‘b> interface, this keeps a nice separation between connecting and inserting into the pipeline, it also makes implementation easier and allows the interfaces to be implemented in other areas of code that need to talk to or connect to a pipeline.

Internals #

Inside the pipeline we have the bounded blocking queue which is implemented by the BlockingCollection from TPL. This is used to store the pipeline payloads that are waiting to be processed.

The consumerLoop function is recursive and continually tries to take items from the blocking collection processing and routing each one to the next pipeline stage.

The processor is a function that transforms from type ‘a to type ‘b.

The router is a function that takes a sequence of IPipelineInput<‘b> and also the payload ‘b it returns a sequence of IPipelineInput<‘b>.  What this effectively means is that we can route by the connected stages (i.e. round robin routing, multi-cast routing.)   Or we could route by payload contents (i.e. if the payload contains a certain bytes sequence we could choose a certain IPipelineInput<‘b>.)

Each item taken is passed to the processor and router via pipeline (|>) and Seq operations, recursively calling itself until an item can no longer be retrieved from the buffer.

The implementation of IPipelineInput<‘a>.Insert is the counterpart to the previous function. It first tries to inset the item into the bounded blocking queue, if this cannot be done then the overflow function is called if one is present. Next the async consumer loop is started if it is not already running. The idea behind this is that by keeping the payload processing running on the thread pool while there is work to do it will cut down on the number of context switches between threads.  Once an item cannot be taken from the bounding blocking queue the loop will exit.

The rest of the code is pretty standard stuff and should be pretty easy to follow.

I also define some symbolic operations to simply constructing and using the pipeline:

++> Attaches the pipeline stage on the right hand side to the one on the left. –> Detaches the pipelinestage on the right from the one on the left. «– Inserts a payload on the right into the pipeline stage on the left. –» Inserts a payload on the left hand side into the pipeline stage on the right.
These help to keep a nice terse description of the pipeline, once things get a little more complex other operators may be required, the now discontinued Axiom had a whole host of these, its a pity Microsoft dropped the language.

Example #

Heres a quick sample pipeline showing the pipeline in use:

  • Stage 1 takes a string and splits it based on the ‘,’.
  • Stage 2 reverses each string.
  • Stage 3 reverses the string back to the original.
module program
  open System
  open Pipeline  
  let consoleLock = new obj()  
  let split del n (s:string) =
    lock consoleLock (fun() ->
    do printfn "%A:before split %A" n s
    let split = s.Split([|del|])
    do printfn "%A:after: split into: %A" n split
    split |> Array.toSeq)  
  let reverse (s:string) =
    new string(s |> Seq.toArray |> Array.rev)  
  let oneToSingleton a b f=
    lock consoleLock (fun() ->
      printfn "%A:before reverse %A" a b
      let result = b |> f
      printfn "%A:after reverse %A" a result
      result|> Seq.singleton)  
  let OneToSeqRev a b = oneToSingleton a b reverse   
  ///Simply picks the first route
  let basicRouter( r, i) =
    let head = Seq.head r
    Seq.singleton head  
  let p1 = PipelineStage( split ',' "1", basicRouter)
  let p2 = PipelineStage( OneToSeqRev "2", basicRouter)
  let p3 = PipelineStage( OneToSeqRev "3", basicRouter)  
  p1 ++> p2 ++> p3 |> ignore  
  let generateCircularSeq (lst:'a list) =
    let rec next () =
      seq {
        for element in lst do
          yield element
        yield! next()
  for str in ["John,Paul,George,Ringo"]
  |> generateCircularSeq
  |> Seq.take 10
    do  str -->> p1  
  let x = Console.ReadKey()

As you can see the assignment of the pipeline stages is pretty simple as is the composition of multiple stages.  This was often one of the most difficult areas while developing a similar pipelines in C# you could often find yourself with a few hundred lines of setup code which was a often a nightmare to debug a few weeks later.

Hopefully I have whet your appetite with pipelines, in a future article I will be combining socket operations with pipeline stages to produce a flexible framework to deal with high throughput network applications.

As always I appreciate any comments, until next time…