My Praise of Advent of Code 2016

During the last days of December I was pleasing my internal need for solving puzzles and tricky tasks by going through Advent of Code 2016 challenge.

The idea is simple: every day since December 1st to 25th, the site publishes a new brain teaser. They are all aligned into one story: the Bad Easter Bunny has stolen all the Chrismas gifts from Santa, and now you are the hero who should break into the Bunny's headquarters and save the gifts for the kids.

Having said that, each challenge is independent from the others, so you can solve them in arbitrary order if you want.

Advent Of Code Levels Advent Calendar in dark ASCII

A puzzle consists of a description and an input data set associated with it. The solution is typically represented as a number or a short string, so it can be easily typed into the textbox. However, to get this solution you need to implement a program: computing it manually is not feasible.

I started a bit late and got just the first 11 puzzles solved. Each puzzle is doable in one sitting, usually half-an-hour to a couple hours of work, which is very nice.

Some problems are purely about the correctness of your solution. The most engaging tasks were also computationally intensive, such that a straightforward solution took too much time to run to completion. You need to find a shortcut to make it faster, which is always fun.

Problem Solved! You collect stars for providing the correct answers

Apart from generic joy and satisfaction that one gets from solving programming challenges like these, I also consider it a good opportunity to try a new programming language or a paradygm.

As I said, the tasks are relatively small, so you can feel the sense of accomplishment quite often, even being not very familiar with the programming language of choice.

There are many other people solving the same puzzles and also sharing their solutions online. You can go and find the other implementations of a task that you just solved, and compare it to your approach. That's the great way to learn from other people, broaden your view and expose yourself to new tricks, data structures and APIs.

I picked F# as my programming language for Advent of Code 2016. I chose to restrict myself to immutable data structures and pure functions. And it played out really nice, I am quite happy with speed of development, readability and performance of the code.

Day 8 solved Solution to one of the puzzles

You can find my code for the first 11 puzzles in my github account. Full sets of F# solutions are available from Mark Heath and Yan Cui.

I included one of the solutions into The Taste of F# talk that I did at a user group earlier this month.

Next year I'll pick another language and will start on December 1st. I invite you to join me in solving Advent of Code 2017.

Kudos to Eric Wastl for creating and maintaining the Advent of Code web site.

My Functional Programming & F# Talks at Webscale Architecture Meetup

On January 10th of 2017 I gave two talks at the Webscale Architecture NL meetup group in Utrecht.

Here are the slides for the people who were there and want to revisit the covered topics.

Introduction of Functional Programming

Link to full-screen HTML slides: Introduction of Functional Programming

Slides on SlideShare:

The Taste of F#

Link to full-screen HTML slides: The Taste of F#

Example problem that I was solving: Advent of Code and Day 8: Two-Factor Authentication

Slides on SlideShare:

Thanks for attending my talks! Feel free to post any feedback in the comments.

Introducing Stream Processing in F#

The post was published for F# Advent Calendar 2016, thus the examples are themed around the Christmas gifts.

This post is opening a series of articles about data processing discipline called Stream Processing.

I describe stream processing from the developer perspective, using the following (rather unusual) angle:

  • F# as the primary language
  • Concepts and principles are more important than frameworks and tools
  • Start with modeling the domain problem and then solve just the problems that your domain has

We begin the journey by introducing what stream processing is all about.

What's and Why's

There are several techniques to process data that are flowing into your applications, and stream processing is one of them.

Stream processing is focused on the real-time processing of data continuously, concurrently, and in a record-by-record fashion. Stream processing is designed to analyze and act on live data flow, using "continuous queries" expressed in user code. Data is structured as a continuous stream of events over time:

Flow of events

In contrast to some other approaches to reason about application structure, stream processing concepts are drawn around the data structures, flows and transformations rather than services or remote calls.

Although the approach is nothing new, it gained much more traction during the last years, especially in big data community. Products like Storm, Kafka, Flink, Samza (all under Apache Foundation), Google Cloud Dataflow, Akka Streams are popularizing the programming model and bringing the tools to make it reliable and scalable.

These products are born from the need to run data processing in massively distributed environments. They are all about scaling out and solving or mitigating the issues of distributed systems which are inherintly not reliable.

While this is a noble and mission-critical goal for internet-scale companies, most applications do not require such massive performances and scale.

There is something to be said for the domain-driven approach, when an application is built around the main asset and burden of enterprise systems: the core business logic. It may happen that you don't need a general purpose framework with the lowest processing latency. Instead your choice of tools might lean towards the cleanest code possible, tailored for your own needs, and maintainable over time.

Knowing the landscape can help you do the right trade-off.

The recent Stream Processing boom comes from Apache / JVM world. Unfortunately, stream processing frameworks and underlying concepts are mostly unfamiliar to .NET developers.

While Azure Cloud provides a managed service called Azure Stream Analytics, the product is built around SQL-like language and is rather limited in extensibility.

We will have a look at other options in .NET space in the further posts of the series.

For now, I want to start filling the gap and introduce the basic concepts with F#. As a bonus, we are not limited by particular tools and implementations, but can start from the ground up.

Elements of a Stream

As I already mentioned above, people are doing stream processing for long time. In fact, if you receive events and then apply the transformation logic structured around a single event at a time - you are already doing stream processing.

Here is a simple picture which illustrates the elements of processing:

Stream Processing Stage

The data Source is responsible for injection of events into the pipeline. They are the input intergration points, typically they can be persistent message queues, logs or subscription feeds.

A sequence of events in the same Source is called Stream (thus Stream Processing). Streams have unbounded nature, which means that the amount of data points is not limited in size or time. There is no "end" of data: events will potentially keep coming as long as the processing application is alive.

The high-level purpose of the Transformation is to extract value from the events. That's where the business logic resides, and that's where development effort goes to. Transformations can also be refered as Stages, Flows, Tasks, Jobs and so on, depending on the context.

The most simple transformation like format conversion can be stateless. However, other transformations will often use some kind of State Store, as a means to

  • Aggregate data from multiple events of the same stream
  • Correlate events from several streams
  • Enrich event data with external lookup data

Data Sink represents the output of the pipeline, the place where the transformed, aggregated and enriched events end up at.

A Sink can be a database of any kind, which stores the processed data, ready to be consumed by user queries and reports.

Another Sink can become a Source for another stream of events. This way the series of transformations are sequenced together into Processing Pipelines (or Topologies).

On the high-level, the pipelines can usually be represented as directed graphs, with data streams in nodes and transformations in edges:

Stream Processing Pipeline

In real-world applications, the pipelines can have lots of interconnected elements and flow branches. We will start with a simplistic example.

Gift Count Pipeline

Word Count is the Hello World and TODO app of the data processing world. Here are the reference implementations for Dataflow, Flink and Storm.

To make it a bit more fun, we'll make a Gift Count pipeline out of it. The following image summarizes our Gift Count topology:

Gift Count Pipeline

The pipeline consists of one source, one sink and two transformations. The input of the pipeline is the source of gift lists (each list is a comma separated line of text).

The purpose of the processing is to tokenize gift lists into separate gifts, and then count the occurances of each gift in the stream. The output is written into a database sink, e.g. a key value store with gifts as keys and amounts as values.

Note that while Split stage is stateless, the Count stage needs to keep some internal state to be able to aggregate data over multiple entries.

Let's start thinking about the implementation. How do we model transformations and pipelines in code?

Transformations

Here's how a transformation is typically represented in Apache Storm, the grand daddy of Big Data Stream Processing systems (transformations are called Bolts in Storm, and code is Java):

public class TokenizerBolt implements IRichBolt {
    public void execute(Tuple input) {
        String wishlist = input.getString(0);
        for(String gift: wishlist.split(", ")) {
            collector.emit(new Values(gift));
        }
        collector.ack(input);
    }
    // Other code is omitted
}

Tokenizer is a class which implements a predefined interface with execute method in it. The method accepts a container class Tuple from where we can extract real data using position-based indexing. Something like a DataRow from ADO.NET. The method does not return anything, but instead calls an effect-ful method emit, passing a tuple to the next transformation in the pipeline.

Clearly, there is some room for improvement here. We don't want to put our important domain logic into amorphous functions of type Collector -> Tuple -> unit. Here is what we can do:

  • Use strong typing to see what function does based on its signature
  • Use pure functions to make them easy to test and reason about
  • Use domain-specific types with F# records and ADTs

Our domain is very simple in Gift Count example. Still, we could describe Gift type to restrict it to be lowercase, not empty etc. But for the sake of simplisity I'll limit it to one liner:

type Gift = string

Now, the type of the first transformation should be string -> Gift list. So, our transformation is based on a function

let tokenize (wishlist: string) =
  wishlist.ToLowerInvariant().Split(", ")
  |> List.ofArray
  |> List.map (fun x -> x.Trim())
  |> List.filter (fun x -> x.Length > 0)

The counting transformation is modeled in a similar way. The base function is of type Gift list -> (Gift * int) list and is actually implemented as

let count xs = List.countBy id xs

Instead of using a real database, we will just print the counts to console. So the last optional step for our examples will be to print out the counts one by one. Here is a helper function:

let print (gift, count) = sprintf "%i %s" count gift

Now, we can tokenize and count the gifts in a single list. But how do we aggregate data over time? Let's leave this to the pipelines.

Pipelines

Let's have a look at a definition of a Storm pipeline (in Java):

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader", new LineReaderSpout());
builder.setBolt("gifts-spitter", new GiftSpitterBolt()).shuffleGrouping("line-reader");
builder.setBolt("gift-counter", new GiftCounterBolt()).shuffleGrouping("gifts-spitter");

There is a Builder class, which is capable to add Sources (Spouts) and Transformations (Bolts) to the pipeline. But again, there's no type story here: the stages are linked by name, and the types are just implementations of predefined interfaces.

Here is another example of the pipeline, now from Google Dataflow SDK (still Java):

Pipeline
    .create(options)
    .apply(TextIO.Read.from("..."))
    .apply(ParDo.named("ExtractGifts").of(new DoFn<String, String>() {
         public void processElement(ProcessContext c) { /* Implements tokenizer */ }
    }))    
    .apply(Count.<String>perElement())
    .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         public String apply(KV<String, Long> element) { /* Formats results */ }
    }))
    .apply(TextIO.Write.to("..."));

I consider this to be more descriptive. There is a clear flow of operations chained together. The types are visible and checked at compile time.

How would we like to see this pipeline in F#? Our motivation example is going to be the same processing applied to a normal F# list of wishlists (strings). The following code snippet counts the gifts in wishlists and prints the result:

wishlists
|> List.collect tokenize
|> List.countBy id
|> List.map print
|> List.iter (Console.WriteLine)

My goal for the rest of the article will be to define a Flow module which would enable me to write a Stream Processing pipeline in the same fashion. Here is the target code:

sourceOfWishlists
|> Flow.collect tokenize
|> Flow.countBy id
|> Flow.map print
|> Flow.connectTo sinkForCounts

So, how do we implement something like this? Let's start with clarifying how Source and Sink can be defined.

Source and Sink

We declared the source of a stream to be unbounded, not limited in time or event count.

However, the modern stream processing systems like Flink and Dataflow (both with ideas from Apache Beam) are trying to sit on two chairs at the same time by declaring that bounded data sources are just sub-case of unbounded streams. If your goal is to process a fixed batch of data, you could represent it as one-by-one sequence of events.

Big Data world has a well known approach when batch processing and real time stream processing are done in parallel, with separate tools and separate code base. The approach is called Lambda Architecture. Beam is declaring this approach outdated, offering a way to reuse streaming code and capacity also for bounded data workloads.

To follow this modern path, we will declare our Source as following:

type BoundedSource<'T> = unit -> 'T seq
type UnboundedSource<'T> = ('T -> unit) -> unit

type Source<'T> = 
  | Bounded of BoundedSource<'T>
  | Unbounded of UnboundedSource<'T>

Source is a generic discriminated union with two cases. Bounded case represents a side effect-ful function, which returns a sequence of elements when called. The argument of unit is there to delay the processing: we need to declare sources long before they start to emit values.

The Unbounded case is a bit harder to understand. It accepts an action to be executed as the argument and returns nothing meaningful (unit again). You will see a usage example later.

The Sink represents an action to happen at the end of pipeline. I've made it a discriminated union too, but with just one case:

type Sink<'T> = | Action of ('T -> unit)

Now, we should be able to simulate an empty processing pipeline: directly connect a source to a sink. Let's start with bounded data:

let copyPipeline source sink =
  match source, sink with
  | Bounded b, Action a -> b() |> Seq.iter a
  | _ -> failwith "Not supported yet"

let gifts = seq ["Ball"; "Train"; "Doll"]
let giftBoundedSource = (fun() -> gifts) |> Bounded

let consoleSink = (fun (s: string) -> Console.WriteLine s) |> Action

copyPipeline giftBoundedSource consoleSink

This code will print out all the gift names from the sequence. Now, let's extend it to stream unbounded data. Before we can do that, let's introduce a helper class:

type Triggered<'T>() = 
  let subscribers = new List<'T -> unit>()
  member this.DoNext x =
    subscribers.ForEach(fun s -> s x)
  member this.Subscribe = subscribers.Add

An instance of such class keeps a mutable list of subscribers. Subscribers are added by calling Subscribe method. Someone else can then call DoNext method, and each subscriber will get an item every time.

Here's how we can use it for unbounded data processing:

let copyPipeline source sink =
  match source, sink with
  | Bounded b, Action a -> b() |> Seq.iter a
  | Unbounded ub, Action a -> ub a

let consoleSource = new Triggered<string>()
let unboundedSource = consoleSource.Subscribe |> Unbounded
copyPipeline unboundedSource consoleSink

Seq.initInfinite (fun _ -> Console.ReadLine())
|> Seq.takeWhile ((<>) "q")
|> Seq.iter consoleSource.DoNext

This little program will echo whatever you enter into the console until you type q to quit. That is an example of unbounded data: you can type as long as you want, there is no hard limit.

Here's how it works:

  1. Triggered source is created.
  2. Unbounded source is declared by subscribing to the trigger.
  3. Our dummy pipeline links the source to the action of writing to console.
  4. Every time a new line is entered, DoNext method of the trigger is called and the data flows to the sink.

Stop here and make sure you understand the example before going further.

Flow

Now it's time to implement the contracts for the flow that we defined in Gift Count example. The contract consists of two parts. The first part is a generic interface which defines all the operations that we need:

type Runnable = unit -> unit

type IFlow<'a> =
  abstract member Map<'b> : ('a -> 'b) -> IFlow<'b>
  abstract member Collect<'b> : ('a -> 'b list) -> IFlow<'b>
  abstract member CountBy<'b when 'b: equality> : ('a -> 'b) -> IFlow<'b * int>
  abstract member To: Sink<'a> -> Runnable

Then we define a module which is just a wrapper around the interface to enable F#-style API:

module Flow =
  let map<'TI, 'TO> (f: 'TI -> 'TO) (flow: IFlow<'TI>) = flow.Map f
  let collect<'TI, 'TO> (f: 'TI -> 'TO list) (flow: IFlow<'TI>) = flow.Collect f
  let countBy<'T, 'TK when 'TK: equality> (f: 'T -> 'TK) (flow: IFlow<'T>) = flow.CountBy f
  let connectTo<'T> sink (flow: IFlow<'T>) = flow.To sink
  let run (r: Runnable) = r()

Then, we just need an implementation of IFlow and a factory method to create an initial instance of flow given a data source.

Now I'd like to emphasize that there are multiple possible implementations of IFlow depending on the required properties for the pipeline. They might make use of different libraries or frameworks, or be a naive simple implementation like the one below, suitable for modeling and testing.

In fact, one of my implementations doesn't run the pipeline, but instead uses reflection to build a visual graph of processing stages, to be used for documentation and discussion purposes.

We will have a look at more advanced implementations in the further articles, but for now here is a naive version:

module Runner =
  let private mapTransform map = function
    | Bounded bs -> bs >> Seq.map map |> Bounded
    | Unbounded us ->
      fun subscriber -> map >> subscriber |> us
      |> Unbounded

  let private collectTransform mapToMany = function
    | Bounded bs -> bs >> Seq.map mapToMany >> Seq.concat |> Bounded
    | Unbounded us ->
      fun subscriber -> mapToMany >> Seq.iter subscriber |> us
      |> Unbounded

  let private countByTransform<'a, 'b when 'b: equality> (getKey: 'a -> 'b) source =
    let state = new Dictionary<'b, int>()
    let addItem i = 
      let key = getKey i
      let value = if state.ContainsKey key then state.[key] else 0
      let newValue = value + 1
      state.[key] <- newValue
      (key, newValue)

    match source with
    | Bounded bs -> bs >> Seq.countBy getKey |> Bounded
    | Unbounded us -> (fun s -> addItem >> s) >> us |> Unbounded

  let private stage source transform sink () =
    match transform source, sink with
    | Bounded bs, Action a -> bs() |> Seq.iter a
    | Unbounded us, Action a -> us a

  type private Flow<'a>(source: Source<'a>, connect: Sink<'a> -> Runnable) =
    member this.Apply<'b> t = new Flow<'b>(t source, stage source t) :> IFlow<'b>

    interface IFlow<'a> with
      member this.Map<'b> map = this.Apply<'b> (mapTransform map)

      member this.Collect<'b> map = this.Apply<'b> (collectTransform map)

      member this.CountBy<'b when 'b: equality>(getKey) = 
        this.Apply<'b * int> (countByTransform<'a, 'b> getKey)

      member this.To(sink) = connect(sink)

  let from<'a> source =
    new Flow<'a>(source, stage source (mapTransform id)) :> IFlow<'a>

The implementation details are not that important at the moment (even though it's just 37 lines of code), so I'll just proceed to the pipeline definition:

unboundedSource                // e.g. same console source as before
|> Runner.from                 // create Runner implementation of IFlow
|> Flow.collect tokenize
|> Flow.countBy id
|> Flow.map print
|> Flow.connectTo consoleSink  // connect to the Sink
|> Flow.run                    // start listening for events

Here you can find the full code of the Gift Count example.

Conclusion

In this article, we started reasoning about low-latency processing pipelines from the domain logic point of view. We tried to reuse well known F# idioms like ADTs and HOFs to show how stream processing is not much different from other types of applications.

Although this post is quite long by now, we just scratched the surface of the stream processing. Here are some focus areas for the follow-ups:

  • More complex pipeline topologies
  • State management
  • Concepts of time, windowing and out-of-order events
  • Reliability, retries and guarantees
  • Scaling out
  • Using 3rd parties for all of that
  • Documentation and formal analysis

Event Sourcing and IO Complexity

Event Sourcing is an approach, when an append-only store is used to record the full series of events that describe actions taken on a particular domain entity. This event store becomes the main source of truth to reconstruct the current state of the entity and its complete history.

In essence, that means that we store the log of all business events that occurred in the system, and then we use them to make new decisions and produce new events.

How Event Souring Works

Event Sourcing is usually used in combination with Command-Query Responsibility Segregation, when all writes to the event store are initiated by commands.

The following picture illustrates the storage and command handling:

Event Store Command Handler

Every time a new command comes in (1), the command handler understands which entity is affected and retrieves all the previous events from the store (2).

The handler aggregates the events and derives the current state of the entity (3). If command is valid given that state, the command handler produces a new event or several events (4), and writes them back to the event store (5).

Disk Space Usage

It's quite obvious that Event Sourcing requires more storage space than traditional approach of only storing the current state. The storage size is proportional to the total amount of events in the system, i.e. it's O(n) or O(e * l) where e is the count of entities in the system and l is the average amount of events per entity.

Here is the chart of disk space usage in a simplified situation of events of equal size:

Disk space simulation

We saved 1000 events and consumed 1000 storage units. The disk space is cheap, so we are willing to take the trade-off of extra storage for the benefits that Event Sourcing provides.

Disk IO Usage

Let's look at how much IO operations we are going to perform over time. Let's say that reading or writing of one event consumes one unit of IO capacity.

Every time a new event is received, we consume one write operation: it's still linear. The storage is append-only, so it makes sense that disk space and writes are essentially the same thing.

Reads are a different beast. Every time we receive a command, we need to perform i reads, where i is the amount of events so far for the entity. Let's have a look at several examples, each one is a simulation of saving a thousand of new events.

In the first scenario we have a steady flow of incoming events. Events belong to different entities (aggregates) with 10 events per entity on average:

Reads for low amount of events per entity

We can see that we do 5x more reads than writes. That is because for each event written we have to read all the previous events for the same entity, and on average there are 5 of them.

In the second scenario we receive the same amount of events in total. While most entities still have 10 events on average, there is just one outlier entity which received 100 events, all in this time period.

Reads with an outlier entity

Hey, the amount of reads almost doubled! The line also doesn't look linear anymore...

Let's look at the third extreme scenario when all 1000 events were generated by the same entity:

Reads from single entity

The amount of reads skyrockets to 100 times more compared to the first scenario. It's clearly quadratic! The amount of reads for a single entity is O(l) where l is the event count for that entity.

Real-Life Scenario

In many use cases it's unlikely that you get outlier entities which have orders of magnitude difference in amount of events per entity. E.g. if your entity is an order in a webshop, there's just a few events that humans can generate about it.

However, if the events are generated from telemetry data or IoT devices, or if the entities tend to live for very long time (like bank accounts), that's a good sign you should not ignore the potential problem. A handful of anomaly devices can bring the whole storage to its knees, if protection is not carefully designed.

If your domain has a chance to belong to the second group, you better get prepared.

Capacity Planning and Monitoring

It's not enough to know just the total number of events in your store, nor is the incoming rate of new events descriptive enough.

Start with modeling your Event Store against real data. Put some monitoring in place to see the distribution of event density per entity. Average number is not descriptive enough, so you need to build percentiles and know the maximum too.

Monitor the amount of reads on your data store. Set the baseline based on the real data pattern, not imaginary numbers.

Throttling / Sampling

In IoT scenarios the easiest way out could be to discard events if they arrive too frequently from the same device, or use some sampling/aggregation at the ingress point. Only your business domain can define what kind of data loss is acceptable, if any.

Snapshots

Event Sourcing concept provides the solution for the reads problem in form of Snapshots. Once in every x events, you should produce a snapshot of the entity state. The next time an event comes in, you just read the snapshot and the events which happened after the latest snapshot time (amount is less than x).

It might be tricky to come up with a good snapshot strategy in some cases, especially when the business domain requires multiple projections to be built.

The snapshot size might also grow over time, if entity keeps some internal event-based lists. But snapshots seem to be the only real solution when the amount of events gets out of control. Choose your Event Store technology with this consideration in mind.

Happy Event Sourcing!

Leaflet plugin to render geographic corridors

Yesterday I've published a simple Leaflet plugin called leaflet-corridor. The plugin defines a new Leaflet primitive L.Corridor.

When initialized with an array of geo points and width, it renders a polyline with width fixed in meters, not in pixels. That means that line width changes whenever zoom level changes.

Leaflet-corridor animation

The plugin is handy to denote geographic corridors: ranges of specified width around a polyline. In our project we used it to show a predefined vehicle route from Origin to Destination, with only limited allowed violation from this predefined route. Whenever vehicle's position falls out of this corridor, the event of Out-of-corridor violation is recorded and shown on the map.

Here are all the links for the corridor plugin:

Mikhail Shilkov I'm Mikhail Shilkov, a software developer. I enjoy F#, C#, Javascript and SQL development, reasoning about distributed systems, data processing pipelines, cloud and web apps. I blog about my experience on this website.

LinkedIn@mikhailshilkovGitHubStack Overflow