Coding Puzzle in F#: Find the Number of Islands

Here's a programming puzzle. Given 2D matrix of 0's and 1's, find the number of islands. A group of connected 1's forms an island. For example, the below matrix contains 5 islands

Input : mat = {{1, 1, 0, 0, 0},
               {0, 1, 0, 0, 1},
               {1, 0, 0, 1, 1},
               {0, 0, 0, 0, 0},
               {1, 0, 1, 0, 1}}
Output : 5

A typical solution to this problem will be implemented in C++, Java or C# and will involve a loop to iterate through the matrix, and another loop or recursion to traverse islands. The traversal progress will be tracked in an auxiliary mutable array, denoting the visited nodes. An example of such solution (and the definition of the problem above) can be found here.

I want to give an example of solution done in F#, with generic immutable data structures and pure functions.

Graph Traversal

First of all, this puzzle is a variation of the standard problem: Counting number of connected components in a graph.

Connected Graph Components

I will start my implementation with a graph traversal implementation, and then we will apply it to the 2D matrix at hand.

The graph is defined by the following type:

type Graph<'a> = {
  Nodes: seq<'a>
  Neighbours: 'a -> seq<'a>
}

It is a record type with two fields: a sequence of all nodes, and a function to get neighbour nodes for a given node. The type of the node is generic: I'll use numbers for our example, but Graph type doesn't care much.

The traversal plan is the following:

  1. Go through the sequence of graph nodes.

  2. Keep two accumulator data structures: the list of disjoint sub-graphs (sets of nodes connected to each other) and the set of visited nodes. Both are empty at the beginning.

  3. If the current node is not in the visited set, recursively traverse all neighbours to find the current connected component.

  4. The connected component traversal is the Depth-First Search, each node is added to both current set and total visited set.

Let's start the implementation from inside out. The following recursive function adds a node to the accumulated sets and calls itself for non-visited neighbours:

let rec visitNode accumulator visited node =
  let newAccumulator = Set.add node accumulator
  let newVisited = Set.add node visited

  graph.Neighbours node
  |> Seq.filter (fun n -> Set.contains n newVisited |> not)
  |> Seq.fold (fun (acc, vis) n -> visitNode acc vis n) (newAccumulator, newVisited)

The type of this function is Set<'a> -> Set<'a> -> 'a -> Set<'a> * Set<'a>.

Step 3 is implemented with visitComponent function:

let visitComponent (sets, visited) node =
  if Set.contains node visited 
  then sets, visited
  else
    let newIsland, newVisited = visitNode Set.empty visited node
    newIsland :: sets, newVisited

Now, the graph traversal is just a fold of graph nodes with visitComponent function.

module Graph =
  let findConnectedComponents graph = 
    graph.Nodes
    |> Seq.fold visitComponent ([], Set.empty)
    |> fst

This is the only public function of our graph API, available for the client applications. The visitNode and visitComponent are defined as local functions underneath (and they close over the graph value).

2D Matrix

Now, let's forget about the graphs for a second and model the 2D matrix of integers. The type definition is simple, it's just an alias for the array:

type Matrix2D = int[,]

Now, we need to be able to traverse the matrix, i.e. iterate through all elements and find the neighbours of each element.

The implementation below is mostly busy validating the boundaries of the array. The neighbours of a cell are up to 8 cells around it, diagonal elements included.

module Matrix2D =
  let allCells (mx: Matrix2D) = seq {
    for x in [0 .. Array2D.length1 mx - 1] do
      for y in [0 .. Array2D.length2 mx - 1] -> x, y
  }

  let neighbours (mx: Matrix2D) (x,y) =
    Seq.crossproduct [x-1 .. x+1] [y-1 .. y+1]
    |> Seq.filter (fun (i, j) -> i >= 0 && j >= 0 
                              && i < Array2D.length1 mx 
                              && j < Array2D.length2 mx)
    |> Seq.filter (fun (i, j) -> i <> x || j <> y)

Putting It All Together

Now we are all set to solve the puzzle. Here is our input array:

let mat = array2D
            [| [|1; 1; 0; 0; 0|];
               [|0; 1; 0; 0; 1|];
               [|1; 0; 0; 1; 1|];
               [|0; 0; 0; 0; 0|];
               [|1; 0; 1; 0; 1|]
            |]

We need a function to define if a given cell is a piece of an island:

let isNode (x, y) = mat.[x, y] = 1

And here is the essence of the solution - our graph definition. Both Nodes and Neightbours are matrix cells filtered to contain 1's.

let graph = {
  Nodes = Matrix2D.allCells mat |> Seq.filter isNode
  Neighbours = Matrix2D.neighbours mat >> Seq.filter isNode
}

The result is calculated with one-liner:

graph |> Graph.findConnectedComponents |> List.length

Conclusion

The implementation above represents my attempt to solve in a functional way the puzzle which is normally solved in imperative style. I took a step back and tried to model the underlying concepts with separate data structures. The types and functions might be reused for similar problems in the same domain space.

While not a rocket science, the Connected Islands puzzle is a good exercise and provides a nice example of functional concepts, which I'm planning to use while discussing FP and F#.

The full code can be found in my github.

Event Sourcing: Optimizing NEventStore SQL read performance

In my previous post about Event Store read complexity I described how the growth of reads from the event database might be quadratic in respect to amount of events per aggregate.

On the higher level, the conclusion was that the event sourced database should be optimized for reads rather that writes, which is not always obvious from the definition of the "append-only store".

NEventStore

In this post I want to look at NEventStore on top of Azure SQL Database which is the combination we currently use for event sourcing in Azure-based web application.

NEventStore library provides a C# abstraction over event store with multiple providers for several database backends. We use the Persistence.SQL provider. When you initialize it with a connection string to an empty database, the provider will go on and create two tables with schema, indexes etc. The most important table is Commits and it gets the following schema:

CREATE TABLE dbo.Commits
(
  BucketId          varchar(40),
  StreamId          char(40),
  StreamRevision    int,
  Items             tinyint,
  CommitId          uniqueidentifier,
  CommitSequence    int,
  CheckpointNumber  bigint IDENTITY(1, 1),
  Payload           varbinary(max),
  CommitStamp       datetime2
)
GO
ALTER TABLE dbo.Commits 
ADD CONSTRAINT PK_Commits 
PRIMARY KEY CLUSTERED (CheckpointNumber)

I removed several columns, most indexes and constraints to make the script more readable.

The primary key is based upon CheckpointNumber - an IDENTITY column, which means the new events (commits) are appended to the end of the clustered index. Clearly, this is good for INSERT performance.

There is a number of secondary non-clustered indexes that are optimized for rich API of NEventStore library, e.g. dispatching events to observers, searching for streams, time-based queries etc.

Our Use Case

It turns out that we don't need those extended API provided by NEventStore. Effectively, we only need two operations to be supported:

  • Add a new event to a stream
  • Read all events of a stream

Our experience of running production-like workloads showed that the read operation performance suffers a lot when the size of a stream grows. Here is a sample query plan for the read query with the default schema:

Query Plan with default primary key

SQL Server uses non-clustered index to find all events of the given steam, and then does key lookups, which might get very expensive for large streams with hundreds or thousands of events.

Tuning for Reads

After seeing this, I decided to re-think the primary index of the Commits table. Here is what I came down to:

ALTER TABLE dbo.Commits 
ADD CONSTRAINT PK_Commits 
PRIMARY KEY CLUSTERED (BucketId, StreamId, CommitSequence)

Now, all the commits of one stream are physically located together in the clustered index.

The change makes INSERT's less efficient. It's not a simple append to the end of the clustered index anymore.

But at this price, the reads just got much faster. Here is the plan for the same query over the new schema:

Query Plan with the new primary key

Simple, beautiful and fast!

Our Results

The results look great for us. We are able to run our 50 GB Commits table on a 100-DTU SQL Database instance, with typical load of 10 to 25 percent. The reads are still taking the biggest chunk of the load, with writes being far behind.

The mileage may vary, so be sure to test your NEventStore schema versus your workload.

Further Improvements

Here are some further steps that we might want to take to make Commits table even faster:

  • The table comes with 5 non-clustered indexes. One of them became our clustered index. Two indexes are unique, so they might be useful for duplicate prevention (e.g. in concurrency scenarios). The remaining two are non-unique, so they can probably be safely deleted unless we start using other queries that they are intended for.

  • There are several columns which are not used in our implementation: StreamIdOriginal, Dispatched and Headers to name a few. We could replace the table with a view of the same name, and always return defaults for those columns in any SELECT, ignoring the values in any INSERT.

But I expect these changes to have moderate impact on performance in contrast to the primary key change discussed above.

My Praise of Advent of Code 2016

During the last days of December I was pleasing my internal need for solving puzzles and tricky tasks by going through Advent of Code 2016 challenge.

The idea is simple: every day since December 1st to 25th, the site publishes a new brain teaser. They are all aligned into one story: the Bad Easter Bunny has stolen all the Chrismas gifts from Santa, and now you are the hero who should break into the Bunny's headquarters and save the gifts for the kids.

Having said that, each challenge is independent from the others, so you can solve them in arbitrary order if you want.

Advent Of Code Levels Advent Calendar in dark ASCII

A puzzle consists of a description and an input data set associated with it. The solution is typically represented as a number or a short string, so it can be easily typed into the textbox. However, to get this solution you need to implement a program: computing it manually is not feasible.

I started a bit late and got just the first 11 puzzles solved. Each puzzle is doable in one sitting, usually half-an-hour to a couple hours of work, which is very nice.

Some problems are purely about the correctness of your solution. The most engaging tasks were also computationally intensive, such that a straightforward solution took too much time to run to completion. You need to find a shortcut to make it faster, which is always fun.

Problem Solved! You collect stars for providing the correct answers

Apart from generic joy and satisfaction that one gets from solving programming challenges like these, I also consider it a good opportunity to try a new programming language or a paradygm.

As I said, the tasks are relatively small, so you can feel the sense of accomplishment quite often, even being not very familiar with the programming language of choice.

There are many other people solving the same puzzles and also sharing their solutions online. You can go and find the other implementations of a task that you just solved, and compare it to your approach. That's the great way to learn from other people, broaden your view and expose yourself to new tricks, data structures and APIs.

I picked F# as my programming language for Advent of Code 2016. I chose to restrict myself to immutable data structures and pure functions. And it played out really nice, I am quite happy with speed of development, readability and performance of the code.

Day 8 solved Solution to one of the puzzles

You can find my code for the first 11 puzzles in my github account. Full sets of F# solutions are available from Mark Heath and Yan Cui.

I included one of the solutions into The Taste of F# talk that I did at a user group earlier this month.

Next year I'll pick another language and will start on December 1st. I invite you to join me in solving Advent of Code 2017.

Kudos to Eric Wastl for creating and maintaining the Advent of Code web site.

My Functional Programming & F# Talks at Webscale Architecture Meetup

On January 10th of 2017 I gave two talks at the Webscale Architecture NL meetup group in Utrecht.

Here are the slides for the people who were there and want to revisit the covered topics.

Introduction of Functional Programming

Link to full-screen HTML slides: Introduction of Functional Programming

Slides on SlideShare:

The Taste of F#

Link to full-screen HTML slides: The Taste of F#

Example problem that I was solving: Advent of Code and Day 8: Two-Factor Authentication

Slides on SlideShare:

Thanks for attending my talks! Feel free to post any feedback in the comments.

Introducing Stream Processing in F#

The post was published for F# Advent Calendar 2016, thus the examples are themed around the Christmas gifts.

This post is opening a series of articles about data processing discipline called Stream Processing.

I describe stream processing from the developer perspective, using the following (rather unusual) angle:

  • F# as the primary language
  • Concepts and principles are more important than frameworks and tools
  • Start with modeling the domain problem and then solve just the problems that your domain has

We begin the journey by introducing what stream processing is all about.

What's and Why's

There are several techniques to process data that are flowing into your applications, and stream processing is one of them.

Stream processing is focused on the real-time processing of data continuously, concurrently, and in a record-by-record fashion. Stream processing is designed to analyze and act on live data flow, using "continuous queries" expressed in user code. Data is structured as a continuous stream of events over time:

Flow of events

In contrast to some other approaches to reason about application structure, stream processing concepts are drawn around the data structures, flows and transformations rather than services or remote calls.

Although the approach is nothing new, it gained much more traction during the last years, especially in big data community. Products like Storm, Kafka, Flink, Samza (all under Apache Foundation), Google Cloud Dataflow, Akka Streams are popularizing the programming model and bringing the tools to make it reliable and scalable.

These products are born from the need to run data processing in massively distributed environments. They are all about scaling out and solving or mitigating the issues of distributed systems which are inherintly not reliable.

While this is a noble and mission-critical goal for internet-scale companies, most applications do not require such massive performances and scale.

There is something to be said for the domain-driven approach, when an application is built around the main asset and burden of enterprise systems: the core business logic. It may happen that you don't need a general purpose framework with the lowest processing latency. Instead your choice of tools might lean towards the cleanest code possible, tailored for your own needs, and maintainable over time.

Knowing the landscape can help you do the right trade-off.

The recent Stream Processing boom comes from Apache / JVM world. Unfortunately, stream processing frameworks and underlying concepts are mostly unfamiliar to .NET developers.

While Azure Cloud provides a managed service called Azure Stream Analytics, the product is built around SQL-like language and is rather limited in extensibility.

We will have a look at other options in .NET space in the further posts of the series.

For now, I want to start filling the gap and introduce the basic concepts with F#. As a bonus, we are not limited by particular tools and implementations, but can start from the ground up.

Elements of a Stream

As I already mentioned above, people are doing stream processing for long time. In fact, if you receive events and then apply the transformation logic structured around a single event at a time - you are already doing stream processing.

Here is a simple picture which illustrates the elements of processing:

Stream Processing Stage

The data Source is responsible for injection of events into the pipeline. They are the input intergration points, typically they can be persistent message queues, logs or subscription feeds.

A sequence of events in the same Source is called Stream (thus Stream Processing). Streams have unbounded nature, which means that the amount of data points is not limited in size or time. There is no "end" of data: events will potentially keep coming as long as the processing application is alive.

The high-level purpose of the Transformation is to extract value from the events. That's where the business logic resides, and that's where development effort goes to. Transformations can also be refered as Stages, Flows, Tasks, Jobs and so on, depending on the context.

The most simple transformation like format conversion can be stateless. However, other transformations will often use some kind of State Store, as a means to

  • Aggregate data from multiple events of the same stream
  • Correlate events from several streams
  • Enrich event data with external lookup data

Data Sink represents the output of the pipeline, the place where the transformed, aggregated and enriched events end up at.

A Sink can be a database of any kind, which stores the processed data, ready to be consumed by user queries and reports.

Another Sink can become a Source for another stream of events. This way the series of transformations are sequenced together into Processing Pipelines (or Topologies).

On the high-level, the pipelines can usually be represented as directed graphs, with data streams in nodes and transformations in edges:

Stream Processing Pipeline

In real-world applications, the pipelines can have lots of interconnected elements and flow branches. We will start with a simplistic example.

Gift Count Pipeline

Word Count is the Hello World and TODO app of the data processing world. Here are the reference implementations for Dataflow, Flink and Storm.

To make it a bit more fun, we'll make a Gift Count pipeline out of it. The following image summarizes our Gift Count topology:

Gift Count Pipeline

The pipeline consists of one source, one sink and two transformations. The input of the pipeline is the source of gift lists (each list is a comma separated line of text).

The purpose of the processing is to tokenize gift lists into separate gifts, and then count the occurances of each gift in the stream. The output is written into a database sink, e.g. a key value store with gifts as keys and amounts as values.

Note that while Split stage is stateless, the Count stage needs to keep some internal state to be able to aggregate data over multiple entries.

Let's start thinking about the implementation. How do we model transformations and pipelines in code?

Transformations

Here's how a transformation is typically represented in Apache Storm, the grand daddy of Big Data Stream Processing systems (transformations are called Bolts in Storm, and code is Java):

public class TokenizerBolt implements IRichBolt {
    public void execute(Tuple input) {
        String wishlist = input.getString(0);
        for(String gift: wishlist.split(", ")) {
            collector.emit(new Values(gift));
        }
        collector.ack(input);
    }
    // Other code is omitted
}

Tokenizer is a class which implements a predefined interface with execute method in it. The method accepts a container class Tuple from where we can extract real data using position-based indexing. Something like a DataRow from ADO.NET. The method does not return anything, but instead calls an effect-ful method emit, passing a tuple to the next transformation in the pipeline.

Clearly, there is some room for improvement here. We don't want to put our important domain logic into amorphous functions of type Collector -> Tuple -> unit. Here is what we can do:

  • Use strong typing to see what function does based on its signature
  • Use pure functions to make them easy to test and reason about
  • Use domain-specific types with F# records and ADTs

Our domain is very simple in Gift Count example. Still, we could describe Gift type to restrict it to be lowercase, not empty etc. But for the sake of simplisity I'll limit it to one liner:

type Gift = string

Now, the type of the first transformation should be string -> Gift list. So, our transformation is based on a function

let tokenize (wishlist: string) =
  wishlist.ToLowerInvariant().Split(", ")
  |> List.ofArray
  |> List.map (fun x -> x.Trim())
  |> List.filter (fun x -> x.Length > 0)

The counting transformation is modeled in a similar way. The base function is of type Gift list -> (Gift * int) list and is actually implemented as

let count xs = List.countBy id xs

Instead of using a real database, we will just print the counts to console. So the last optional step for our examples will be to print out the counts one by one. Here is a helper function:

let print (gift, count) = sprintf "%i %s" count gift

Now, we can tokenize and count the gifts in a single list. But how do we aggregate data over time? Let's leave this to the pipelines.

Pipelines

Let's have a look at a definition of a Storm pipeline (in Java):

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader", new LineReaderSpout());
builder.setBolt("gifts-spitter", new GiftSpitterBolt()).shuffleGrouping("line-reader");
builder.setBolt("gift-counter", new GiftCounterBolt()).shuffleGrouping("gifts-spitter");

There is a Builder class, which is capable to add Sources (Spouts) and Transformations (Bolts) to the pipeline. But again, there's no type story here: the stages are linked by name, and the types are just implementations of predefined interfaces.

Here is another example of the pipeline, now from Google Dataflow SDK (still Java):

Pipeline
    .create(options)
    .apply(TextIO.Read.from("..."))
    .apply(ParDo.named("ExtractGifts").of(new DoFn<String, String>() {
         public void processElement(ProcessContext c) { /* Implements tokenizer */ }
    }))    
    .apply(Count.<String>perElement())
    .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         public String apply(KV<String, Long> element) { /* Formats results */ }
    }))
    .apply(TextIO.Write.to("..."));

I consider this to be more descriptive. There is a clear flow of operations chained together. The types are visible and checked at compile time.

How would we like to see this pipeline in F#? Our motivation example is going to be the same processing applied to a normal F# list of wishlists (strings). The following code snippet counts the gifts in wishlists and prints the result:

wishlists
|> List.collect tokenize
|> List.countBy id
|> List.map print
|> List.iter (Console.WriteLine)

My goal for the rest of the article will be to define a Flow module which would enable me to write a Stream Processing pipeline in the same fashion. Here is the target code:

sourceOfWishlists
|> Flow.collect tokenize
|> Flow.countBy id
|> Flow.map print
|> Flow.connectTo sinkForCounts

So, how do we implement something like this? Let's start with clarifying how Source and Sink can be defined.

Source and Sink

We declared the source of a stream to be unbounded, not limited in time or event count.

However, the modern stream processing systems like Flink and Dataflow (both with ideas from Apache Beam) are trying to sit on two chairs at the same time by declaring that bounded data sources are just sub-case of unbounded streams. If your goal is to process a fixed batch of data, you could represent it as one-by-one sequence of events.

Big Data world has a well known approach when batch processing and real time stream processing are done in parallel, with separate tools and separate code base. The approach is called Lambda Architecture. Beam is declaring this approach outdated, offering a way to reuse streaming code and capacity also for bounded data workloads.

To follow this modern path, we will declare our Source as following:

type BoundedSource<'T> = unit -> 'T seq
type UnboundedSource<'T> = ('T -> unit) -> unit

type Source<'T> = 
  | Bounded of BoundedSource<'T>
  | Unbounded of UnboundedSource<'T>

Source is a generic discriminated union with two cases. Bounded case represents a side effect-ful function, which returns a sequence of elements when called. The argument of unit is there to delay the processing: we need to declare sources long before they start to emit values.

The Unbounded case is a bit harder to understand. It accepts an action to be executed as the argument and returns nothing meaningful (unit again). You will see a usage example later.

The Sink represents an action to happen at the end of pipeline. I've made it a discriminated union too, but with just one case:

type Sink<'T> = | Action of ('T -> unit)

Now, we should be able to simulate an empty processing pipeline: directly connect a source to a sink. Let's start with bounded data:

let copyPipeline source sink =
  match source, sink with
  | Bounded b, Action a -> b() |> Seq.iter a
  | _ -> failwith "Not supported yet"

let gifts = seq ["Ball"; "Train"; "Doll"]
let giftBoundedSource = (fun() -> gifts) |> Bounded

let consoleSink = (fun (s: string) -> Console.WriteLine s) |> Action

copyPipeline giftBoundedSource consoleSink

This code will print out all the gift names from the sequence. Now, let's extend it to stream unbounded data. Before we can do that, let's introduce a helper class:

type Triggered<'T>() = 
  let subscribers = new List<'T -> unit>()
  member this.DoNext x =
    subscribers.ForEach(fun s -> s x)
  member this.Subscribe = subscribers.Add

An instance of such class keeps a mutable list of subscribers. Subscribers are added by calling Subscribe method. Someone else can then call DoNext method, and each subscriber will get an item every time.

Here's how we can use it for unbounded data processing:

let copyPipeline source sink =
  match source, sink with
  | Bounded b, Action a -> b() |> Seq.iter a
  | Unbounded ub, Action a -> ub a

let consoleSource = new Triggered<string>()
let unboundedSource = consoleSource.Subscribe |> Unbounded
copyPipeline unboundedSource consoleSink

Seq.initInfinite (fun _ -> Console.ReadLine())
|> Seq.takeWhile ((<>) "q")
|> Seq.iter consoleSource.DoNext

This little program will echo whatever you enter into the console until you type q to quit. That is an example of unbounded data: you can type as long as you want, there is no hard limit.

Here's how it works:

  1. Triggered source is created.
  2. Unbounded source is declared by subscribing to the trigger.
  3. Our dummy pipeline links the source to the action of writing to console.
  4. Every time a new line is entered, DoNext method of the trigger is called and the data flows to the sink.

Stop here and make sure you understand the example before going further.

Flow

Now it's time to implement the contracts for the flow that we defined in Gift Count example. The contract consists of two parts. The first part is a generic interface which defines all the operations that we need:

type Runnable = unit -> unit

type IFlow<'a> =
  abstract member Map<'b> : ('a -> 'b) -> IFlow<'b>
  abstract member Collect<'b> : ('a -> 'b list) -> IFlow<'b>
  abstract member CountBy<'b when 'b: equality> : ('a -> 'b) -> IFlow<'b * int>
  abstract member To: Sink<'a> -> Runnable

Then we define a module which is just a wrapper around the interface to enable F#-style API:

module Flow =
  let map<'TI, 'TO> (f: 'TI -> 'TO) (flow: IFlow<'TI>) = flow.Map f
  let collect<'TI, 'TO> (f: 'TI -> 'TO list) (flow: IFlow<'TI>) = flow.Collect f
  let countBy<'T, 'TK when 'TK: equality> (f: 'T -> 'TK) (flow: IFlow<'T>) = flow.CountBy f
  let connectTo<'T> sink (flow: IFlow<'T>) = flow.To sink
  let run (r: Runnable) = r()

Then, we just need an implementation of IFlow and a factory method to create an initial instance of flow given a data source.

Now I'd like to emphasize that there are multiple possible implementations of IFlow depending on the required properties for the pipeline. They might make use of different libraries or frameworks, or be a naive simple implementation like the one below, suitable for modeling and testing.

In fact, one of my implementations doesn't run the pipeline, but instead uses reflection to build a visual graph of processing stages, to be used for documentation and discussion purposes.

We will have a look at more advanced implementations in the further articles, but for now here is a naive version:

module Runner =
  let private mapTransform map = function
    | Bounded bs -> bs >> Seq.map map |> Bounded
    | Unbounded us ->
      fun subscriber -> map >> subscriber |> us
      |> Unbounded

  let private collectTransform mapToMany = function
    | Bounded bs -> bs >> Seq.map mapToMany >> Seq.concat |> Bounded
    | Unbounded us ->
      fun subscriber -> mapToMany >> Seq.iter subscriber |> us
      |> Unbounded

  let private countByTransform<'a, 'b when 'b: equality> (getKey: 'a -> 'b) source =
    let state = new Dictionary<'b, int>()
    let addItem i = 
      let key = getKey i
      let value = if state.ContainsKey key then state.[key] else 0
      let newValue = value + 1
      state.[key] <- newValue
      (key, newValue)

    match source with
    | Bounded bs -> bs >> Seq.countBy getKey |> Bounded
    | Unbounded us -> (fun s -> addItem >> s) >> us |> Unbounded

  let private stage source transform sink () =
    match transform source, sink with
    | Bounded bs, Action a -> bs() |> Seq.iter a
    | Unbounded us, Action a -> us a

  type private Flow<'a>(source: Source<'a>, connect: Sink<'a> -> Runnable) =
    member this.Apply<'b> t = new Flow<'b>(t source, stage source t) :> IFlow<'b>

    interface IFlow<'a> with
      member this.Map<'b> map = this.Apply<'b> (mapTransform map)

      member this.Collect<'b> map = this.Apply<'b> (collectTransform map)

      member this.CountBy<'b when 'b: equality>(getKey) = 
        this.Apply<'b * int> (countByTransform<'a, 'b> getKey)

      member this.To(sink) = connect(sink)

  let from<'a> source =
    new Flow<'a>(source, stage source (mapTransform id)) :> IFlow<'a>

The implementation details are not that important at the moment (even though it's just 37 lines of code), so I'll just proceed to the pipeline definition:

unboundedSource                // e.g. same console source as before
|> Runner.from                 // create Runner implementation of IFlow
|> Flow.collect tokenize
|> Flow.countBy id
|> Flow.map print
|> Flow.connectTo consoleSink  // connect to the Sink
|> Flow.run                    // start listening for events

Here you can find the full code of the Gift Count example.

Conclusion

In this article, we started reasoning about low-latency processing pipelines from the domain logic point of view. We tried to reuse well known F# idioms like ADTs and HOFs to show how stream processing is not much different from other types of applications.

Although this post is quite long by now, we just scratched the surface of the stream processing. Here are some focus areas for the follow-ups:

  • More complex pipeline topologies
  • State management
  • Concepts of time, windowing and out-of-order events
  • Reliability, retries and guarantees
  • Scaling out
  • Using 3rd parties for all of that
  • Documentation and formal analysis

Mikhail Shilkov I'm Mikhail Shilkov, a software developer. I enjoy F#, C#, Javascript and SQL development, reasoning about distributed systems, data processing pipelines, cloud and web apps. I blog about my experience on this website.

LinkedIn@mikhailshilkovGitHubStack Overflow