Custom Autoscaling of Azure App Service with a Function App

The power of cloud computing comes from its elasticity and ability to adapt to changing load. Most Azure services can be scaled up or down manually: by human interaction in the portal, or by running a command or a script.

Some services in Azure also support Autoscaling, i.e. they may change the resource allocation dynamically, based on predefined rules and current operational metrics.

Azure App Service is one example of such service: it supports Scaling based on a pre-set metric. This is a powerful option that enables website or webjobs to react on varying load, e.g. based on CPU utilization.

At the same time, the flexibility of the built-in autoscaling is somewhat limited:

  • Only a handful of metrics is supported: for instance, Service Bus Queues are supported as metric source, while Service Bus Subscriptions are not;

  • It's not possible to combine several metrics in one rule: e.g. scale down only if several queues are empty at the same time, not just one of them;

  • Thresholds are the same for any number of instances: I can't define a scale down rule threshold to be 60% for 8 instances but 30% for 2 instances;

  • The minimum time of reaction is limited to 5 minutes.

Other services, like SQL Database and Cosmos DB, don't have the built-in autoscaling functionality at all.

This post starts the series of articles about custom implementation of autoscaling. The implementation will be based on Azure Functions as building blocks of scaling workflows.

Goal

To keep the task very specific for now, I want the following from my first custom autoscaling implementation:

  • Be able to scale the amount of instances up and down in a given App Service Plan;

  • Do so based on the given Service Bus Subscription backlog (amount of messages pending to be processed);

  • Scale up, if the average backlog during any 10 minutes is above a threshold;

  • Scale down, if the maximum backlog during any 10 minutes is below another (lower) threshold;

  • After scaling up or down, take a cooldown period of 10 minutes;

  • Have a log of scaling decisions and numbers behind;

  • Scaling rules should be extensible to allow more complex calculation later on.

Architecture

I decided that the scaling rules should be written in a general-purpose programming language (C# for this post), instead of just picking from a limited list of configurations.

I chose Azure Functions as the mechanism to host and run this logic in Azure cloud.

Here is a diagram of Functions that I ended up creating:

Autoscaling Architecture

The components of my autoscaling app are:

  • Metric Collector function is based on Timer trigger: it fires every minute and collects the subscription backlog metric from a given Service Bus Subscription;

  • Collector then sends this metric to the Metrics storage queue;

  • Scaling Logic function pulls the metric from the queue. It maintains the metric values for 10 minutes, calculates average/maximum value, and if they hit thresholds - issues a command to scale App Service Plan up or down;

  • The command is sent to Actions storage queue;

  • Scaler function receives the commands from the queue and executes the re-scaling action on App Service Plan using Azure Management SDK.

The implementation of this workflow is discussed below. I am using Visual Studio 2017 Version 15.3 Preview 4.0 to author pre-compiled Azure Functions with nice built-in tooling.

Metric Collector

First, let's define MetricValue class, which simply holds time and value:

public class MetricValue
{
    public MetricValue(DateTime time, int value)
    {
        this.Time = time;
        this.Value = value;
    }

    public DateTime Time { get; }

    public int Value { get; }
}

and Metric class which extends the value with resource name (e.g. App Service Plan name) and measured parameter name:

public class Metric
{
    public Metric(string resourceName, string name, MetricValue value)
    {
        this.ResourceName = resourceName;
        this.Name = name;
        this.Value = value;
    }

    public string ResourceName { get; }

    public string Name { get; }

    public MetricValue Value { get; }
}

The function definition has two associated bindings: timer trigger (runs every minute) and return binding to the storage queue:

[FunctionName("MetricCollector")]
[return: Queue("Metrics")]
public static Metric MetricCollector([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, TraceWriter log)
{
    var connectionString = Environment.GetEnvironmentVariable("ServiceBusConnection");
    var topic = Environment.GetEnvironmentVariable("Topic");
    var subscription = Environment.GetEnvironmentVariable("Subscription");

    var nsmgr = NamespaceManager.CreateFromConnectionString(connectionString);
    var subscriptionClient = nsmgr.GetSubscription(topic, subscription);
    var backlog = subscriptionClient.MessageCountDetails.ActiveMessageCount;

    log.Info($"Collector: Current metric value is {backlog}");

    var resource = Environment.GetEnvironmentVariable("ResourceToScale");
    var value = new MetricValue(DateTime.Now, (int)backlog);
    return new Metric(resource, $"{topic}-{subscription}-backlog", value);
}

The function executes the following steps:

  • Reads configuration value for Service Bus parameters;
  • Connects to Service Bus and retrieves ActiveMessageCount for the given subscription;
  • Logs the value for tracing and debugging;
  • Returns the metric value mentioning which resource it's intended for.

Scaling Logic

The core of autoscaling implementation resides in ScalingLogic function.

The function defines 4 (oh my!) bindings:

  • Queue trigger to react on messages from the collector;
  • Output queue binding to send commands with action to execute;
  • Combination of input and output bindings to the same row in Table Storage to keep the state in between function calls.

The bindings are illustrated on the following picture:

Binding of Scaling Logic Function

And here is the corresponding Function signature:

[FunctionName("ScalingLogic")]
[return: Queue("Actions")]
public static ScaleAction ScalingLogic(
    [QueueTrigger("Metrics")] Metric metric, 
    [Table("Scaling", "{ResourceName}", "{Name}")] ScalingStateEntity stateEntity, 
    [Table("Scaling", "{ResourceName}", "{Name}")] out ScalingStateEntity newStateEntity,
    TraceWriter log)

Table storage is partitioned per scalable resource, and state is stored per metric; thus multiple resources and metrics are supported out of the box.

The function implementation is relatively complex, so I'll describe it in parts.

ScaleAction is a simple message class:

public enum ScaleActionType
{
    Up,
    Down
}

public class ScaleAction
{
    public ScaleAction(string resourceName, ScaleActionType type)
    {
        this.ResourceName = resourceName;
        this.Type = type;
    }

    public string ResourceName { get; }

    public ScaleActionType Type { get; }
}

Table Storage only allows primitive types for its columns, like strings. So I had to create a separate Table Storage entity class:

public class ScalingStateEntity : TableEntity
{
    public string SerializedState { get; set; }
}

which stores serialized state, from the state class itself:

public class ScalingState
{
    public List<MetricValue> History { get; } = new List<MetricValue>();

    public DateTime LastScalingActionTime { get; set; } = DateTime.MinValue;
}

Now let's look at the function body. It consists of four blocks.

The first block retrieves the previous values of the metric and logs it too:

// 1. Deserialize state
var state = stateEntity?.SerializedState != null 
    ? JsonConvert.DeserializeObject<ScalingState>(stateEntity.SerializedState) 
    : new ScalingState();
var history = state.History;
log.Info($"Scaling logic: Received {metric.Name}, previous state is {string.Join(", ", history)}");

The second block adds the current metric value and removes all metrics which are not in the target period of 10 minutes anymore:

// 2. Add current metric value, remove old values
history.Add(metric.Value);
history.RemoveAll(e => e.Time < metric.Value.Time.Substract(period));

Now, the actual logic finally kicks in and produces the scaling action if average or maximum value is above or below respective thresholds. For my implementation I also chose to apply this rule after 5th data point. Cooldown period is also respected:

// 3. Compare the aggregates to thresholds, produce scaling action if needed
ScaleAction action = null;
if (history.Count >= 5
    && DateTime.Now - state.LastScalingActionTime > cooldownPeriod)
{
    var average = (int)history.Average(e => e.Value);
    var maximum = (int)history.Max(e => e.Value);
    if (average > thresholdUp)
    {
        log.Info($"Scaling logic: Value {average} is too high, scaling {metric.ResourceName} up...");
        state.LastScalingActionTime = DateTime.Now;
        action = new ScaleAction(metric.ResourceName, ScaleActionType.Up);
    }
    else if (maximum < thresholdDown)
    {
        log.Info($"Scaling logic: Value {maximum} is low, scaling {metric.ResourceName} down...");
        state.LastScalingActionTime = DateTime.Now;
        action = new ScaleAction(metric.ResourceName, ScaleActionType.Down);
    }
}

Finally, the state is serialized back to table entity and action is returned:

// 4. Serialize the state back and return the action
newStateEntity = stateEntity != null 
    ? stateEntity 
    : new ScalingStateEntity { PartitionKey = metric.ResourceName, RowKey = metric.Name };
newStateEntity.SerializedState = JsonConvert.SerializeObject(state);
return action;

Note, that if no scaling action is warranted, the function simply returns null and no message gets sent to the output queue.

Scaler

The last function of the workflow is called Scaler: it listens for scaling commands and executes them. I am using Azure Management Fluent SDK to scale the App Service Plan capacity:

[FunctionName("Scaler")]
public static void Scaler([QueueTrigger("Actions")] ScaleAction action, TraceWriter log)
{
    var secrets = Environment.GetEnvironmentVariable("ServicePrincipal").Split(',');
    var credentials = SdkContext.AzureCredentialsFactory
        .FromServicePrincipal(secrets[0], secrets[1], secrets[2], AzureEnvironment.AzureGlobalCloud);
    var azure = Azure.Configure()
        .Authenticate(credentials)
        .WithDefaultSubscription();

    var plan = azure.AppServices
        .AppServicePlans
        .List()
        .First(p => p.Name.Contains(action.ResourceName));

    var newCapacity = action.Type == ScaleActionType.Down ? plan.Capacity - 1 : plan.Capacity + 1;
    log.Info($"Scaler: Switching {action.ResourceName} from {plan.Capacity} {action.Type} to {newCapacity}");

    plan.Update()
        .WithCapacity(newCapacity)
        .Apply();
}

The functionality is pretty straightforward. Here are some links where you can read more about Authentication in Azure Management libraries and Managing Web App with Fluent SDK.

Conclusion and Further Steps

This was quite a lot of code for a single blog post, but most of it was fairly straightforward. You can find the full implemenation in my github.

Overall, I've established an application based on Azure Functions, which watches the predefined metrics and scales the specified resource up and down based on target metric values.

The current example works only for the combination of Service Bus Subscription and App Service Plan, but it is clear how to extend it to more scenarios.

The flexibility of such autoscaling solution exceeds the built-in functionality that is available in Azure Portal.

The most complex part of my Autoscaling application is the Scaling Logic function. In the next article of the series, I will refactor it to use Durable Functions - the upcoming Orchestration framework for Function Apps.

Stay tuned, and happy scaling!

Sending Large Batches to Azure Service Bus

Azure Service Bus client supports sending messages in batches (SendBatch and SendBatchAsync methods of QueueClient and TopicClient). However, the size of a single batch must stay below 256k bytes, otherwise the whole batch will get rejected.

How do we make sure that the batch-to-be-sent is going to fit? The rest of this article will try to answer this seemingly simple question.

Problem Statement

Given a list of messages of arbitrary type T, we want to send them to Service Bus in batches. The amount of batches should be close to minimal, but obviously each one of them must satisfy the restriction of 256k max size.

So, we want to implement a method with the following signature:

public Task SendBigBatchAsync<T>(IEnumerable<T> messages);

which would work for collections of any size.

To limit the scope, I will restrict the article to the following assumptions:

  • Each individual message is less than 256k serialized. If that wasn't true, we'd have to put the body into external blob storage first, and then send the reference. It's not directly related to the topic of discussion.

  • I'll use public BrokeredMessage(object serializableObject) constructor. Custom serialization could be used, but again, it's not related to batching, so I'll ignore it.

  • We won't care about transactions, i.e. if connectivity dies in the middle of sending the big batch, we might end up with partially sent batch.

Messages of Known Size

Let's start with a simple use case: the size of each message is known to us. It's defined by hypothetical Func<T, long> getSize function. Here is a helpful extension method that will split an arbitrary collection based on a metric function and maximum chunk size:

public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
    return source
        .Aggregate(
            new
            {
                Sum = 0L,
                Current = (List<T>)null,
                Result = new List<List<T>>()
            },
            (agg, item) =>
            {
                var value = metric(item);
                if (agg.Current == null || agg.Sum + value > maxChunkSize)
                {
                    var current = new List<T> { item };
                    agg.Result.Add(current);
                    return new { Sum = value, Current = current, agg.Result };
                }

                agg.Current.Add(item);
                return new { Sum = agg.Sum + value, agg.Current, agg.Result };
            })
        .Result;
}

Now, the implementation of SendBigBatchAsync is simple:

public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
    var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
        await client.SendBatchAsync(brokeredMessages);
    }
}

private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;

Note that I do await for each chunk sequentially to preserve message ordering. Another thing to notice is that we lost all-or-nothing guarantee: we might be able to send the first chunk, and then get an exception from subsequent parts. Some sort of retry mechanism is probably needed.

BrokeredMessage.Size

OK, how do we determine the size of each message? How do we implement getSize function?

BrokeredMessage class exposes Size property, so it might be tempting to rewrite our method the following way:

public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
    var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
    var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        await client.SendBatchAsync(chunk);
    }
}

Unfortunately, this won't work properly. A quote from documentation:

The value of Size is only accurate after the BrokeredMessage instance is sent or received.

My experiments show that Size of a draft message returns the size of the message body, ignoring headers. If the message bodies are large, and each chunk has just a handful of them, the code might work ok-ish.

But it will significantly underestimate the size of large batches of messages with small payload.

So, for the rest of this article I'll try to adjust the calculation for headers.

Fixed Header Size

It could be that the header size of each message is always the same. Quite often people will set the same headers for all their messages, or set no custom headers at all.

In this case, you might just measure this size once, and then put this fixed value inside a configuration file.

Here is how you measure the headers of a BrokeredMessage message:

var sizeBefore = message.Size;
client.Send(message);
var sizeAfter = message.Size;
var headerSize = sizeAfter - sizeBefore;

Now you just need to adjust one line from the previous version of SendBigBatchAsync method

var chunks = brokeredMessages.ChunkBy(bm => FixedHeaderSize + bm.Size, MaxServiceBusMessage);

FixedHeaderSize might be simply hard-coded, or taken from configuration per application.

Measuring of Header Size per Message

If the size of headers varies per message, you need a way to adjust batching algorithm accordingly.

Unfortunately, I haven't found a straightforward way to accomplish that. It looks like you'd have to serialize the headers yourself, and then measure the size of resulting binary. This is not a trivial operation to do correctly, and also implies some performance penalty.

Sean Feldman came up with a way to estimate the size of headers. That might be a good way to go, though the estimation tends to err on the safe side for messages with small payload.

Heuristics & Retry

The last possibility that I want to consider is actually allow yourself violating the max size of the batch, but then handle the exception, retry the send operation and adjust future calculations based on actual measured size of the failed messages. The size is known after trying to SendBatch, even if operation failed, so we can use this information.

Here is a sketch of how to do that in code:

// Sender is reused across requests
public class BatchSender
{
    private readonly QueueClient queueClient;
    private long batchSizeLimit = 262000;
    private long headerSizeEstimate = 54; // start with the smallest header possible

    public BatchSender(QueueClient queueClient)
    {
        this.queueClient = queueClient;
    }

    public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
    {
        var packets = (from m in messages
                     let bm = new BrokeredMessage(m)
                     select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
        var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
        foreach (var chunk in chunks)
        {
            try
            {
                await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
            }
            catch (MessageSizeExceededException)
            {
                var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
                if (maxHeader > this.headerSizeEstimate)
                {
                    // If failed messages had bigger headers, remember this header size 
                    // as max observed and use it in future calculations
                    this.headerSizeEstimate = maxHeader;
                }
                else
                {
                    // Reduce max batch size to 95% of current value
                    this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
                }

                // Re-send the failed chunk
                await this.SendBigBatchAsync(packets.Select(p => p.Source));
            }

        }
    }
}

The code example is quite involved, here is what actually happens:

  1. Create a brokered message for each message object, but also save the corresponding source message. This is critical to be able to re-send items: there's no way to send the same BrokeredMessage instance twice.

  2. Also save the body size of the brokered message. We'll use it for retry calculation.

  3. Start with some guess of header size estimate. I start with 54 bytes, which seems to be the minimal header size possible.

  4. Split the batch into chunks the same way we did before.

  5. Try sending chunks one by one.

  6. If send operation fails with MessageSizeExceededException, iterate through failed items and find out the actual header size of the message.

  7. If that actual size is bigger than our known estimate, increase the estimate to the newly observed value. Retry sending the chunk (not the whole batch) with this new setting.

  8. If the header is small, but message size is still too big - reduce the allowed total size of the chunk. Retry again.

The combination of checks of steps 7 and 8 should make the mechanism reliable and self-adopting to message header payloads.

Since we reuse the sender between send operations, the size parameters will also converge quite quickly and no more retries will be needed. Thus the performance overhead should be minimal.

Conclusion

It seems like there is no "one size fits all" solution for this problem at the moment. The best implementation might depend on your messaging requirements.

But if you have the silver bullet solution, please leave a comment under this post and answer my StackOverflow question!

Otherwise, let's hope that the new .NET Standard-compatible Service Bus client will solve this issue for us. Track this github issue for status updates.

Finding Lost Events in Azure Application Insights

One of the ways we use Azure Application Insights is tracking custom application-specific events. For instance, every time a data point from an IoT device comes in, we log an AppInsights event. Then we are able to aggregate the data and plot charts to derive trends and detect possible anomalies.

And recently we found such anomaly, which looked like this:

Amount of Events on Dashboard Chart

This is a chart from our Azure dashboard, which shows the total amount of events of specific type received per day.

The first two "hills" are two weeks, so we can clearly see that we get more events on business days compared to weekends.

But then something happened on May 20: we started getting much less events, and the hill pattern disappeared, days looks much more alike.

We haven't noticed any other problems in the system, but the trend looked quite bothering. Are we loosing data?

I headed towards Analytics console of Application Insights to dig deeper. Here is the query that reproduces the problem:

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| project PointCount = todouble(customMeasurements["EventXReceived_Count"]), timestamp
| summarize EventXReceived = sum(PointCount) by bin(timestamp, 1d)
| render timechart

and I got the same chart as before:

Trend on Application Insights Analytics

I checked the history of our source code repository and deployments and I figured out that we upgraded the version of Application Insights SDK from version 2.1 to version 2.3.

My guess at this point was that Application Insights started sampling our data instead of sending all events to the server. After reading Sampling in Application Insights article, I came up with the following query to see the sampling rate:

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| summarize 100/avg(itemCount) by bin(timestamp, 1d) 
| render areachart

and the result is self-explanatory:

Sampling Rate

Clearly, the sampling rate dropped from 100% down to about 30% right when the anomaly started. The sampling-adjusted query (note itemCount multiplication)

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| project PointCount = todouble(customMeasurements["EventXReceived_Count"]) * itemCount, timestamp
| summarize EventXReceived = sum(PointCount) by bin(timestamp, 1d)
| render timechart

puts us back to the point when results make sense:

Adjusted Trend on Application Insights Analytics

The third week's Thursday was bank holiday in several European countries, so we got a drop there.

Should Azure dashboard items take sampling into account - to avoid confusing people and to show more useful charts?

Mikhail.io Upgraded to HTTPS and HTTP/2

Starting today, this blog has switched to HTTPS secure protocol:

HTTPS

While there's not that much to secure on my blog, HTTPS is still considered to be a good practice for any site in 2017. One of the benefits that we can get from it is the usage of HTTP/2 protocol:

HTTP/2

This should be beneficial to any reader which uses a modern browser!

Thanks to CloudFlare for providing me with free HTTPS and HTTP/2 support.

Reliable Consumer of Azure Event Hubs

Azure Event Hubs is a log-based messaging system-as-a-service in Azure cloud. It's designed to be able to handle huge amount of data, and naturally supports multiple consumers.

Event Hubs and Service Bus

While Event Hubs are formally part of Azure Service Bus family of products, in fact its model is quite different.

"Traditional" Service Bus service is organized around queues (subscriptions are just queues with the topic being the source of messages). Each consumer can peek messages from the queue, do the required processing and then complete the message to remove it from the queue, or abort the processing. Abortion will leave the message at the queue, or will move it to the Dead Letter Queue. Completion/abortion are granular per message; and the status of each message is managed by the Service Bus broker.

Service Bus Processors

Event Hubs service is different. Each Hub represnts a log of messages. Event producer appends data to the end of the log, and consumers can read this log, but they can't remove or change the status of events there.

Each event has an offset associated with it. And the only operation that is supported for consumers is "give me some messages starting at the offset X".

Event Hub Processors

While this approach might seem simplistic, it actually makes consumers more powerful:

  • The messages do not disappear from the Hub after being processed for the first time. So, if needed, the consumer can go back and re-process older events again;

  • Multiple consumers are always supported, out of the box. They just read the same log, after all;

  • Each consumer can go at its own pace, drop and resume processing whenever needed, with no effect on other consumers.

There are some disadvantages too:

  • Consumers have to manage their own state of the processing progress, i.e. they have to save the offset of the last processed event;

  • There is no way to mark any specific event as failed to be able to reprocess it later. There's no notion of Dead Letter Queue either.

Event Processor Host

To overcome the first complication, Microsoft provides the consumer API called EventProcessorHost. This API has an implementation of consumers based on checkpointing. All you need to do is to provide a callback to process a batch of events, and then call CheckpointAsync method, which saves the current offset of the last message into Azure Blob Storage. If the consumer restarts at any point in time, it will read the last checkpoint to find the current offset, and will then continue processing from that point on.

It works great for some scenarios, but the event delivery/processing guarantees are relatively low in this case:

  • Any failures are ignored: there's no retry or Dead Letter Queue

  • There are no transactions between event hub checkpoints and the data sinks that the processor works with (i.e. data stores where processed messages end up at)

In this post I want to focus on a way to process events with higher consistency requirements, in particular:

  • Event Hub processor modifies data in a SQL Database, and such processing is transactional per batch of messages

  • Each event should be (successfully) processed exactly once

  • If event processing failed, it should be marked as failed and kept available to be reprocessed at later point in time

While end-to-end exactly-once processing would require changes of the producers too, we will only focus on consumer side in this post.

Transactional Checkpoints in SQL

If checkpoint information is stored in Azure Blobs, there is no obvious way to implement distributed transactions between SQL Database and Azure Storage.

However, we can override the default checkpointing mechanism and implement our own checkpoints based on a SQL table. This way each checkpoint update can become part of a SQL transaction and be committed or rolled back with normal guarantees provided by SQL Server.

Here is a table that I created to hold my checkpoints:

CREATE TABLE EventHubCheckpoint (
  Topic varchar(100) NOT NULL,
  PartitionID varchar(100) NOT NULL,
  SequenceNumber bigint NOT NULL,
  Offset varchar(20) NOT NULL,
  CONSTRAINT PK_EventHubCheckpoint PRIMARY KEY CLUSTERED (Topic, PartitionID)
)

For each topic and partition of Event Hubs, we store two values: sequence number and offset, which together uniquely identify the consumer position.

Conveniently, Event Host Processor provides an extensibility point to override the default checkpoint manager with a custom one. For that we need to implement ICheckpointManager interface to work with our SQL table.

The implementation mainly consists of 3 methods: CreateCheckpointIfNotExistsAsync, GetCheckpointAsync and UpdateCheckpointAsync. The names are pretty much self-explanatory, and my Dapper-based implementation is quite trivial. You can find the code here.

For now, I'm ignoring the related topic of lease management and corresponding interface ILeaseManager. It's quite a subject on its own; for the sake of simplicity I'll assume we have just one consumer process per partition, which makes proper lease manager redundand.

Dead Letter Queue

Now, we want to be able to mark some messages as failed and to re-process them later. To make Dead Letters transactional, we need another SQL table to hold the failed events:

CREATE TABLE EventHubDeadLetter (
  Topic varchar(100) NOT NULL,
  PartitionID varchar(100) NOT NULL,
  SequenceNumber bigint NOT NULL,
  Offset varchar(20) NOT NULL,
  FailedAt datetime NOT NULL,
  Error nvarchar(max) NOT NULL,
  CONSTRAINT PK_EventHubDeadLetter PRIMARY KEY CLUSTERED (Topic, PartitionID)
)

This table looks very similar to EventHubCheckpoint that I defined above. That is because they are effectively storing pointers to events in a hub. Dead Letters have two additional columns to store error timestamp and text.

There is no need to store the message content, because failed events still sit in the event hub anyway. You could still log it for diagnostics purpose - just make an extra varbinary column.

There's no notion of dead letters in Event Hubs SDK, so I defined my own interface IDeadLetterManager with a single AddFailedEvents method:

public interface IDeadLetterManager
{
    Task AddFailedEvents(IEnumerable<DeadLetter<EventData>> deadLetters);
}

public class DeadLetter<T>
{
    public T Data { get; set; }
    public DateTime FailureTime { get; set; }
    public Exception Exception { get; set; }
}

Dapper-based implementation is trivial again, you can find the code here.

Putting It Together: Event Host

My final solution is still using EventHostProcessor. I pass SQLCheckpointManager into its constructor, and then I implement IEventProcessor's ProcessEventsAsync method in the following way:

  1. Instantiate a list of items to store failed events
  2. Start a SQL transaction
  3. Loop through all the received events in the batch
  4. Process each item inside a try-catch block
  5. If exception happens, add the current event to the list of failed events
  6. After all items are processed, save failed events to Dead Letter table
  7. Update the checkpoint pointer
  8. Commit the transaction

The code block that illustrates this workflow:

public async Task ProcessEventsAsync(
    PartitionContext context, 
    IEnumerable<EventData> eventDatas)
{
    // 1. Instantiate a list of items to store failed events
    var failedItems = new List<DeadLetter<EventData>>();

    // 2. Start a SQL transaction
    using (var scope = new TransactionScope())
    {
        // 3. Loop through all the received events in the batch
        foreach (var eventData in eventDatas)
        {
            try
            {
                // 4. Process each item inside a try-catch block
                var item = this.Deserialize(eventData);
                await this.DoWork(item);
            }
            catch (Exception ex)
            {
                // 5. Add a failed event to the list
                failedItems.Add(new DeadLetter<EventData>(eventData, DateTime.UtcNow, ex));
            }
        }

        if (failedItems.Any())
        {
            // 6. Save failed items to Dead Letter table
            await this.dlq.AddFailedEvents(failedItems);
        }

        // 7. Update the checkpoint pointer
        await context.CheckpointAsync();

        // 8. Commit the transaction
        scope.Complete();
    }
}

Conclusion

My implementation of Event Hubs consumer consists of 3 parts: checkpoint manager that saves processing progress per partition into a SQL table; dead letter manager that persists information about processing errors; and an event host which uses both to provide transactional processing of events.

The transaction scope is limited to SQL Server databases, but it might be sufficient for many real world scenarios.

Mikhail Shilkov I'm Mikhail Shilkov, a software developer. I enjoy F#, C#, Javascript and SQL development, reasoning about distributed systems, data processing pipelines, cloud and web apps. I blog about my experience on this website.

LinkedIn@mikhailshilkovGitHubStack Overflow