Authoring a Custom Binding for Azure Functions

In my previous post I described how I used Durable Functions extensions in Azure Function App. Durable Functions are using several binding types that are not part of the standard suite: OrchestrationClient, OrchestrationTrigger, ActivityTrigger. These custom bindings are installed by copying the corresponding assemblies to a special Extensions folder.

Although Bring-Your-Own-Binding (BYOB) feature hasn't been released yet, I decided to follow the path of Durable Functions and create my own custom binding.

Configuration Binding

I've picked a really simple use case for my first experiments with custom bindings: reading configuration values.

Azure Functions store their configuration values in App Settings (local runtime uses local.settings.json file for that).

That means, when you need a configuration value inside your C# code, you normally do

string setting = ConfigurationManager.AppSettings["MySetting"];

Alternatively, Environment.GetEnvironmentVariable() method can be used.

When I needed to collect service bus subscription metrics, I wrote this kind of bulky code:

var resourceToScale = ConfigurationManager.AppSettings["ResourceToScale"];

var connectionString = ConfigurationManager.AppSettings["ServiceBusConnection"];
var topic = ConfigurationManager.AppSettings["Topic"];
var subscription = ConfigurationManager.AppSettings["Subscription"];

The code is no rocket science, but it's tedious to write, so instead I came up with this idea to define Functions:

public static void MyFunction(
    [TimerTrigger("0 */1 * * * *")] TimerInfo timer,
    [Configuration(Key = "ResourceToScale")] string resource,
    [Configuration] ServiceBusSubscriptionConfig config)

Note two usages of Configuration attribute. The first one defines the specific configuration key, and binds its value to a string parameter. The other one binds multiple configuration values to a POCO parameter. I defined the config class as

public class ServiceBusSubscriptionConfig
{
    public ServiceBusSubscriptionConfig(string serviceBusConnection, string topic, string subscription)
    {
        ServiceBusConnection = serviceBusConnection;
        Topic = topic;
        Subscription = subscription;
    }

    public string ServiceBusConnection { get; }
    public string Topic { get; }
    public string Subscription { get; }
}

The immutable class is a bit verbose, but I still prefer it over get-set container in this scenario.

The binding behavior is convention-based in this case: the binding engine should load configuration values based on the names of class properties.

Motivation

So, why do I need such binding?

As I said, it's a simple use case to play with BYOB feature, and overall, understand the internals of Function Apps a bit better.

But apart from that, I removed 4 lines of garbage from the function body (at the cost of two extra parameters). Less noise means more readable code, especially when I put this code on a webpage.

As a bonus, the testability of the function immediately increased. It's so much easier for the test just to accept the configuration as input parameter, instead of fine-tuning the configuration files inside test projects, or hiding ConfigurationManager usage behind a mockable facade.

Such approach does seem to be the strength of Azure Functions code in general. It's often possible to reduce imperative IO-related code to attribute-decorated function parameters.

Implementing a Custom Binding

The actual implementation process of a custom non-trigger binding is quite simple:

Create a class library with the word "Extension" in its name. Import Microsoft.Azure.WebJobs and Microsoft.Azure.WebJobs.Extensions NuGet packages (at the time of writing I used 2.1.0-beta1 version).

Define a class for binding attribute:

[AttributeUsage(AttributeTargets.Parameter)]
[Binding]
public class ConfigurationAttribute : Attribute
{
    [AutoResolve]
    public string Key { get; set; }
}

The attribute is marked as Binding and the Key property is marked as resolvable from function.json.

Implement IExtensionConfigProvider which will tell the function runtime how to use your binding correctly.

The interface has just one method to implement:

public class ConfigurationExtensionConfigProvider : IExtensionConfigProvider
{
    public void Initialize(ExtensionConfigContext context)
    {
        // ... see below
    }
}

The first step of the implementation is to define a rule for our new ConfigurationAttribute and tell this rule how to get a string value out of any attribute instance:

var rule = context.AddBindingRule<ConfigurationAttribute>();
rule.BindToInput<string>(a => ConfigurationManager.AppSettings[a.Key]);

That's really all that needs to happen to bind string parameters.

To make our binding work with any POCO, we need a more elaborate construct:

rule.BindToInput<Env>(_ => new Env());
var cm = context.Config.GetService<IConverterManager>();
cm.AddConverter<Env, OpenType, ConfigurationAttribute>(typeof(PocoConverter<>));

I instruct the rule to bind to my custom class Env, and then I say that this class Env is convertable to any type (denoted by special OpenType type argument) with a generic converter called PocoConverter.

The Env class is a bit dummy (it exists just because I need some class):

private class Env
{
    public string GetValue(string key) => ConfigurationManager.AppSettings[key];
}

And PocoConverter is a piece of reflection, that loops through property names and reads configuration values out of them. Then it calls a constructor which matches the property count:

private class PocoConverter<T> : IConverter<Env, T>
{
    public T Convert(Env env)
    {
        var values = typeof(T)
            .GetProperties()
            .Select(p => p.Name)
            .Select(env.GetValue)
            .Cast<object>()
            .ToArray();

        var constructor = typeof(T).GetConstructor(values.Select(v => v.GetType()).ToArray());
        if (constructor == null)
        {
            throw new Exception("We tried to bind to your C# class, but it looks like there's no constructor which accepts all property values");
        }

        return (T)constructor.Invoke(values);
    }
}

This piece of code is not particularly robust, but it is good enough to illustrate the concept.

And that's it, the binding it ready! You can find the complete example in my github repo.

Deploying Custom Bindings

Since BYOB feature is in early preview, there is no tooling for automated deployment, and we need to do everything manually. But the process is not too sophisticated:

  1. Create a folder for custom bindings, e.g. D:\BindingExtensions.

  2. Set AzureWebJobs_ExtensionsPath parameter in your app settings to that folder's path. For local development add a line to local.settings.json:

     "AzureWebJobs_ExtensionsPath": "D:\\BindingExtensions",
    
  3. Create a subfolder for your extension, e.g. D:\BindingExtensions\ConfigurationExtension.

  4. Copy the contents of bin\Debug\ of your extension's class library to that folder.

  5. Reference your extension library from your Function App.

You are good to go! Decorate your function parameters with the new attribute.

Run the function app locally to try it out. In the console output you should be able to see something like

Loaded custom extension: ConfigurationExtensionConfigProvider from 
'D:\BindingExtensions\ConfigurationExtension\MyExtensions.dll'

You will be able to debug your extension if needed.

Use the following links to find out more about custom bindings, see more examples and walkthroughs, and get fresh updates:

Have a good binding!

Custom Autoscaling with Durable Functions

In my previous post Custom Autoscaling of Azure App Service with a Function App I've created a Function App which watches a Service Bus Subscription backlog and adjusts the scale of App Service based on the observed load.

It works fine but there are two minor issues that I would like to address in this article:

  • Scaling Logic function from that workflow needs to preserve state between calls. I used Table Storage bindings for that, which proved to be a bit verbose and low level: I needed to manage conversion to entity and JSON serialization myself;

  • There is no feedback from Scaler function (which executes the change) back to Scaling Logic function. Thus, if scaling operation is slow or fails, there is no easy way to notify the logic about that.

Let's see how these issues can be solved with Azure Durable Functions.

Meet Durable Functions

Microsoft has recently announced the preview of Durable Functions:

Durable Functions is an Azure Functions extension for building long-running, stateful function orchestrations in code using C# in a serverless environment.

The library is built on top of Durable Task Framework and introduces several patterns for Function coordination and stateful processing. Please go read the documentation, it's great and has some very useful examples.

I decided to give Durable Functions a try for my autoscaling workflow. Feel free to refer to the first part to understand my goals and the previous implementation.

Architecture

The flow of metric collection, scaling logic and scaling action stays the same. The state and cross-function communication aspects are now delegated to Durable Functions, so the diagram becomes somewhat simpler:

Autoscaling Architecture

The blue sign on Scaling Logic function denotes its statefulness.

Let's walk through the functions implementation to see how the workflow plays out.

This time I'll start with Scaler function and then flow from right to left to make the explanation more clear.

Scaler

Scaler function applies the scaling decisions to the Azure resource, App Service Plan in my case. I've extracted App Service related code to a helper, to keep the function minimal and clean. You can see the full code in my github repo.

Scaler function is triggered by Durable Function's ActivityTrigger. That basically means that it's ready to be called from other functions. Here is the code:

[FunctionName(nameof(Scaler))]
public static int Scaler([ActivityTrigger] DurableActivityContext context)
{
    var action = context.GetInput<ScaleAction>();

    var newCapacity = ScalingHelper.ChangeAppServiceInstanceCount(
        action.ResourceName,
        action.Type == ScaleActionType.Down ? -1 : +1);

    return newCapacity;
}

In order to receive an input value, I utilize context.GetInput() method. I believe that the team is working on support of custom classes (ScaleAction in my case) directly as function parameters.

The function then executes the scale change and returns back the new capacity of App Service Plan. Note that this is new: we were not able to return values in the previous implementation.

Scaling Logic

Scaling Logic is using Stateful Actor pattern. One instance of such actor is created for each scalable resource (I only use 1 now). Here is the implementation (again, simplified for readability):

[FunctionName(nameof(ScalingLogic))]
public static async Task<ScalingState> ScalingLogic(
    [OrchestrationTrigger] DurableOrchestrationContext context, 
    TraceWriter log)
{
    var state = context.GetInput<ScalingState>();

    var metric = await context.WaitForExternalEvent<Metric>(nameof(Metric));

    UpdateHistory(state.History, metric.Value);
    ScaleAction action = CalculateScalingAction(state);

    if (action != null)
    {
        var result = await context.CallFunctionAsync<int>(nameof(Scaler), action);
        log.Info($"Scaling logic: Scaled to {result} instances.");
        state.LastScalingActionTime = context.CurrentUtcDateTime;
    }

    context.ContinueAsNew(state);
    return state;
}

Here is how it works:

  • Function is bound to OrchestrationTrigger, yet another trigger type from Durable Functions;

  • It loads durable state from the received context;

  • It then waits for an external event called Metric (to be sent by Collector function, see the next section);

  • When an event is received, the function updates its state and calculates if a scaling action is warranted;

  • If yes, it calls Scaler function and sends the scale action. It expects an integer result, denoting the new amount of instances;

  • It then calls ContinueAsNew method to start a new iteration of the actor loop, providing the updated state.

One important note: the orchestrated function has to be deterministic. That means, for example, that DateTime.Now is not allowed to be used. I use context.CurrentUtcDateTime instead for time-related calculations.

The implementation of this function solves both problems that I mentioned in the introduction. We do not manage state storage and serialization manually, and we now have the ability to get feedback from Scaler function.

Metrics Collector

I've extracted Service Bus related code to a helper to keep the code sample minimal and clean. You can see the full code in my github repo.

Here is the remaining implementation of Metric Collector:

[FunctionName(nameof(MetricCollector))]
public static async Task MetricCollector(
    [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer,
    [OrchestrationClient] DurableOrchestrationClient client,
    TraceWriter log)
{
    var resource = Environment.GetEnvironmentVariable("ResourceToScale");

    var status = await client.GetStatusAsync(resource);
    if (status == null)
    {
        await client.StartNewAsync(nameof(ScalingLogic), resource, new ScalingState());
    }
    else
    {
        var metric = ServiceBusHelper.GetSubscriptionMetric(resource);
        log.Info($"Collector: Current metric value is {metric.Value.Value} at {DateTime.Now}");
        await client.RaiseEventAsync(resource, nameof(Metric), metric);
    }
}

It's still a timer-triggered "normal" (non-durable) function, but now it also has an additional binding to OrchestrationClient. This client is used to communicate metric data to the Scaling Logic.

With the current implementation, Metric Collector also has a second responsibility: actor instance management. At every iteration, it queries for the current status of corresponding actor. If that is null, Collector creates a new instance with initial empty state.

To my liking, this aspect is a bit unfortunate, but it seems to be required with the current implementation of Durable Functions framework. See my related question on github.

Conclusions

I adjusted the initial flow of autoscaling functions to use Durable Functions library. It made the state management look more straightforward, and also allowed direct communication between two functions in strongly-typed request-response manner.

The resulting code is relatively clear and resembles the typical structure of async-await code that C# developers are used to.

There are some downsides that I found about Durable Functions too:

  • This is a very early preview, so there are some implementation issues. A couple times I managed to put my functions into a state where they were stuck and no calls could be made anymore. The only way I could get out of there is by clearing some blobs in the Storage Account;

  • The actor instance management story feels raw. The function, which needs to send events to actors, has to manage their lifecycle and instance IDs. I would need to add some more checks to make the code production ready, e.g. to restart actors if they end up in faulty state;

  • There are some concurrency issues in function-to-function communication to be resolved;

  • Some discipline is required to keep Durable functions side-effect free and deterministic. The multiple executions caused by awaits and replays are counter-intuitive (at least for novice devs), and thus error-prone.

Having said that, I believe Durable Functions can be a very useful abstraction to simplify some of the more advanced scenarios and workflows. I look forward to further iterations of the library, and I will keep trying it out for more scenarios.

Custom Autoscaling of Azure App Service with a Function App

The power of cloud computing comes from its elasticity and ability to adapt to changing load. Most Azure services can be scaled up or down manually: by human interaction in the portal, or by running a command or a script.

Some services in Azure also support Autoscaling, i.e. they may change the resource allocation dynamically, based on predefined rules and current operational metrics.

Azure App Service is one example of such service: it supports Scaling based on a pre-set metric. This is a powerful option that enables website or webjobs to react on varying load, e.g. based on CPU utilization.

At the same time, the flexibility of the built-in autoscaling is somewhat limited:

  • Only a handful of metrics is supported: for instance, Service Bus Queues are supported as metric source, while Service Bus Subscriptions are not;

  • It's not possible to combine several metrics in one rule: e.g. scale down only if several queues are empty at the same time, not just one of them;

  • Thresholds are the same for any number of instances: I can't define a scale down rule threshold to be 60% for 8 instances but 30% for 2 instances;

  • The minimum time of reaction is limited to 5 minutes.

Other services, like SQL Database and Cosmos DB, don't have the built-in autoscaling functionality at all.

This post starts the series of articles about custom implementation of autoscaling. The implementation will be based on Azure Functions as building blocks of scaling workflows.

Goal

To keep the task very specific for now, I want the following from my first custom autoscaling implementation:

  • Be able to scale the amount of instances up and down in a given App Service Plan;

  • Do so based on the given Service Bus Subscription backlog (amount of messages pending to be processed);

  • Scale up, if the average backlog during any 10 minutes is above a threshold;

  • Scale down, if the maximum backlog during any 10 minutes is below another (lower) threshold;

  • After scaling up or down, take a cooldown period of 10 minutes;

  • Have a log of scaling decisions and numbers behind;

  • Scaling rules should be extensible to allow more complex calculation later on.

Architecture

I decided that the scaling rules should be written in a general-purpose programming language (C# for this post), instead of just picking from a limited list of configurations.

I chose Azure Functions as the mechanism to host and run this logic in Azure cloud.

Here is a diagram of Functions that I ended up creating:

Autoscaling Architecture

The components of my autoscaling app are:

  • Metric Collector function is based on Timer trigger: it fires every minute and collects the subscription backlog metric from a given Service Bus Subscription;

  • Collector then sends this metric to the Metrics storage queue;

  • Scaling Logic function pulls the metric from the queue. It maintains the metric values for 10 minutes, calculates average/maximum value, and if they hit thresholds - issues a command to scale App Service Plan up or down;

  • The command is sent to Actions storage queue;

  • Scaler function receives the commands from the queue and executes the re-scaling action on App Service Plan using Azure Management SDK.

The implementation of this workflow is discussed below. I am using Visual Studio 2017 Version 15.3 Preview 4.0 to author pre-compiled Azure Functions with nice built-in tooling.

Metric Collector

First, let's define MetricValue class, which simply holds time and value:

public class MetricValue
{
    public MetricValue(DateTime time, int value)
    {
        this.Time = time;
        this.Value = value;
    }

    public DateTime Time { get; }

    public int Value { get; }
}

and Metric class which extends the value with resource name (e.g. App Service Plan name) and measured parameter name:

public class Metric
{
    public Metric(string resourceName, string name, MetricValue value)
    {
        this.ResourceName = resourceName;
        this.Name = name;
        this.Value = value;
    }

    public string ResourceName { get; }

    public string Name { get; }

    public MetricValue Value { get; }
}

The function definition has two associated bindings: timer trigger (runs every minute) and return binding to the storage queue:

[FunctionName("MetricCollector")]
[return: Queue("Metrics")]
public static Metric MetricCollector([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, TraceWriter log)
{
    var connectionString = Environment.GetEnvironmentVariable("ServiceBusConnection");
    var topic = Environment.GetEnvironmentVariable("Topic");
    var subscription = Environment.GetEnvironmentVariable("Subscription");

    var nsmgr = NamespaceManager.CreateFromConnectionString(connectionString);
    var subscriptionClient = nsmgr.GetSubscription(topic, subscription);
    var backlog = subscriptionClient.MessageCountDetails.ActiveMessageCount;

    log.Info($"Collector: Current metric value is {backlog}");

    var resource = Environment.GetEnvironmentVariable("ResourceToScale");
    var value = new MetricValue(DateTime.Now, (int)backlog);
    return new Metric(resource, $"{topic}-{subscription}-backlog", value);
}

The function executes the following steps:

  • Reads configuration value for Service Bus parameters;
  • Connects to Service Bus and retrieves ActiveMessageCount for the given subscription;
  • Logs the value for tracing and debugging;
  • Returns the metric value mentioning which resource it's intended for.

Scaling Logic

The core of autoscaling implementation resides in ScalingLogic function.

The function defines 4 (oh my!) bindings:

  • Queue trigger to react on messages from the collector;
  • Output queue binding to send commands with action to execute;
  • Combination of input and output bindings to the same row in Table Storage to keep the state in between function calls.

The bindings are illustrated on the following picture:

Binding of Scaling Logic Function

And here is the corresponding Function signature:

[FunctionName("ScalingLogic")]
[return: Queue("Actions")]
public static ScaleAction ScalingLogic(
    [QueueTrigger("Metrics")] Metric metric, 
    [Table("Scaling", "{ResourceName}", "{Name}")] ScalingStateEntity stateEntity, 
    [Table("Scaling", "{ResourceName}", "{Name}")] out ScalingStateEntity newStateEntity,
    TraceWriter log)

Table storage is partitioned per scalable resource, and state is stored per metric; thus multiple resources and metrics are supported out of the box.

The function implementation is relatively complex, so I'll describe it in parts.

ScaleAction is a simple message class:

public enum ScaleActionType
{
    Up,
    Down
}

public class ScaleAction
{
    public ScaleAction(string resourceName, ScaleActionType type)
    {
        this.ResourceName = resourceName;
        this.Type = type;
    }

    public string ResourceName { get; }

    public ScaleActionType Type { get; }
}

Table Storage only allows primitive types for its columns, like strings. So I had to create a separate Table Storage entity class:

public class ScalingStateEntity : TableEntity
{
    public string SerializedState { get; set; }
}

which stores serialized state, from the state class itself:

public class ScalingState
{
    public List<MetricValue> History { get; } = new List<MetricValue>();

    public DateTime LastScalingActionTime { get; set; } = DateTime.MinValue;
}

Now let's look at the function body. It consists of four blocks.

The first block retrieves the previous values of the metric and logs it too:

// 1. Deserialize state
var state = stateEntity?.SerializedState != null 
    ? JsonConvert.DeserializeObject<ScalingState>(stateEntity.SerializedState) 
    : new ScalingState();
var history = state.History;
log.Info($"Scaling logic: Received {metric.Name}, previous state is {string.Join(", ", history)}");

The second block adds the current metric value and removes all metrics which are not in the target period of 10 minutes anymore:

// 2. Add current metric value, remove old values
history.Add(metric.Value);
history.RemoveAll(e => e.Time < metric.Value.Time.Substract(period));

Now, the actual logic finally kicks in and produces the scaling action if average or maximum value is above or below respective thresholds. For my implementation I also chose to apply this rule after 5th data point. Cooldown period is also respected:

// 3. Compare the aggregates to thresholds, produce scaling action if needed
ScaleAction action = null;
if (history.Count >= 5
    && DateTime.Now - state.LastScalingActionTime > cooldownPeriod)
{
    var average = (int)history.Average(e => e.Value);
    var maximum = (int)history.Max(e => e.Value);
    if (average > thresholdUp)
    {
        log.Info($"Scaling logic: Value {average} is too high, scaling {metric.ResourceName} up...");
        state.LastScalingActionTime = DateTime.Now;
        action = new ScaleAction(metric.ResourceName, ScaleActionType.Up);
    }
    else if (maximum < thresholdDown)
    {
        log.Info($"Scaling logic: Value {maximum} is low, scaling {metric.ResourceName} down...");
        state.LastScalingActionTime = DateTime.Now;
        action = new ScaleAction(metric.ResourceName, ScaleActionType.Down);
    }
}

Finally, the state is serialized back to table entity and action is returned:

// 4. Serialize the state back and return the action
newStateEntity = stateEntity != null 
    ? stateEntity 
    : new ScalingStateEntity { PartitionKey = metric.ResourceName, RowKey = metric.Name };
newStateEntity.SerializedState = JsonConvert.SerializeObject(state);
return action;

Note, that if no scaling action is warranted, the function simply returns null and no message gets sent to the output queue.

Scaler

The last function of the workflow is called Scaler: it listens for scaling commands and executes them. I am using Azure Management Fluent SDK to scale the App Service Plan capacity:

[FunctionName("Scaler")]
public static void Scaler([QueueTrigger("Actions")] ScaleAction action, TraceWriter log)
{
    var secrets = Environment.GetEnvironmentVariable("ServicePrincipal").Split(',');
    var credentials = SdkContext.AzureCredentialsFactory
        .FromServicePrincipal(secrets[0], secrets[1], secrets[2], AzureEnvironment.AzureGlobalCloud);
    var azure = Azure.Configure()
        .Authenticate(credentials)
        .WithDefaultSubscription();

    var plan = azure.AppServices
        .AppServicePlans
        .List()
        .First(p => p.Name.Contains(action.ResourceName));

    var newCapacity = action.Type == ScaleActionType.Down ? plan.Capacity - 1 : plan.Capacity + 1;
    log.Info($"Scaler: Switching {action.ResourceName} from {plan.Capacity} {action.Type} to {newCapacity}");

    plan.Update()
        .WithCapacity(newCapacity)
        .Apply();
}

The functionality is pretty straightforward. Here are some links where you can read more about Authentication in Azure Management libraries and Managing Web App with Fluent SDK.

Conclusion and Further Steps

This was quite a lot of code for a single blog post, but most of it was fairly straightforward. You can find the full implemenation in my github.

Overall, I've established an application based on Azure Functions, which watches the predefined metrics and scales the specified resource up and down based on target metric values.

The current example works only for the combination of Service Bus Subscription and App Service Plan, but it is clear how to extend it to more scenarios.

The flexibility of such autoscaling solution exceeds the built-in functionality that is available in Azure Portal.

The most complex part of my Autoscaling application is the Scaling Logic function. In the next article of the series, I will refactor it to use Durable Functions - the upcoming Orchestration framework for Function Apps.

Stay tuned, and happy scaling!

Sending Large Batches to Azure Service Bus

Azure Service Bus client supports sending messages in batches (SendBatch and SendBatchAsync methods of QueueClient and TopicClient). However, the size of a single batch must stay below 256k bytes, otherwise the whole batch will get rejected.

How do we make sure that the batch-to-be-sent is going to fit? The rest of this article will try to answer this seemingly simple question.

Problem Statement

Given a list of messages of arbitrary type T, we want to send them to Service Bus in batches. The amount of batches should be close to minimal, but obviously each one of them must satisfy the restriction of 256k max size.

So, we want to implement a method with the following signature:

public Task SendBigBatchAsync<T>(IEnumerable<T> messages);

which would work for collections of any size.

To limit the scope, I will restrict the article to the following assumptions:

  • Each individual message is less than 256k serialized. If that wasn't true, we'd have to put the body into external blob storage first, and then send the reference. It's not directly related to the topic of discussion.

  • I'll use public BrokeredMessage(object serializableObject) constructor. Custom serialization could be used, but again, it's not related to batching, so I'll ignore it.

  • We won't care about transactions, i.e. if connectivity dies in the middle of sending the big batch, we might end up with partially sent batch.

Messages of Known Size

Let's start with a simple use case: the size of each message is known to us. It's defined by hypothetical Func<T, long> getSize function. Here is a helpful extension method that will split an arbitrary collection based on a metric function and maximum chunk size:

public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
    return source
        .Aggregate(
            new
            {
                Sum = 0L,
                Current = (List<T>)null,
                Result = new List<List<T>>()
            },
            (agg, item) =>
            {
                var value = metric(item);
                if (agg.Current == null || agg.Sum + value > maxChunkSize)
                {
                    var current = new List<T> { item };
                    agg.Result.Add(current);
                    return new { Sum = value, Current = current, agg.Result };
                }

                agg.Current.Add(item);
                return new { Sum = agg.Sum + value, agg.Current, agg.Result };
            })
        .Result;
}

Now, the implementation of SendBigBatchAsync is simple:

public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
    var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
        await client.SendBatchAsync(brokeredMessages);
    }
}

private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;

Note that I do await for each chunk sequentially to preserve message ordering. Another thing to notice is that we lost all-or-nothing guarantee: we might be able to send the first chunk, and then get an exception from subsequent parts. Some sort of retry mechanism is probably needed.

BrokeredMessage.Size

OK, how do we determine the size of each message? How do we implement getSize function?

BrokeredMessage class exposes Size property, so it might be tempting to rewrite our method the following way:

public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
    var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
    var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        await client.SendBatchAsync(chunk);
    }
}

Unfortunately, this won't work properly. A quote from documentation:

The value of Size is only accurate after the BrokeredMessage instance is sent or received.

My experiments show that Size of a draft message returns the size of the message body, ignoring headers. If the message bodies are large, and each chunk has just a handful of them, the code might work ok-ish.

But it will significantly underestimate the size of large batches of messages with small payload.

So, for the rest of this article I'll try to adjust the calculation for headers.

Fixed Header Size

It could be that the header size of each message is always the same. Quite often people will set the same headers for all their messages, or set no custom headers at all.

In this case, you might just measure this size once, and then put this fixed value inside a configuration file.

Here is how you measure the headers of a BrokeredMessage message:

var sizeBefore = message.Size;
client.Send(message);
var sizeAfter = message.Size;
var headerSize = sizeAfter - sizeBefore;

Now you just need to adjust one line from the previous version of SendBigBatchAsync method

var chunks = brokeredMessages.ChunkBy(bm => FixedHeaderSize + bm.Size, MaxServiceBusMessage);

FixedHeaderSize might be simply hard-coded, or taken from configuration per application.

Measuring of Header Size per Message

If the size of headers varies per message, you need a way to adjust batching algorithm accordingly.

Unfortunately, I haven't found a straightforward way to accomplish that. It looks like you'd have to serialize the headers yourself, and then measure the size of resulting binary. This is not a trivial operation to do correctly, and also implies some performance penalty.

Sean Feldman came up with a way to estimate the size of headers. That might be a good way to go, though the estimation tends to err on the safe side for messages with small payload.

Heuristics & Retry

The last possibility that I want to consider is actually allow yourself violating the max size of the batch, but then handle the exception, retry the send operation and adjust future calculations based on actual measured size of the failed messages. The size is known after trying to SendBatch, even if operation failed, so we can use this information.

Here is a sketch of how to do that in code:

// Sender is reused across requests
public class BatchSender
{
    private readonly QueueClient queueClient;
    private long batchSizeLimit = 262000;
    private long headerSizeEstimate = 54; // start with the smallest header possible

    public BatchSender(QueueClient queueClient)
    {
        this.queueClient = queueClient;
    }

    public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
    {
        var packets = (from m in messages
                     let bm = new BrokeredMessage(m)
                     select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
        var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
        foreach (var chunk in chunks)
        {
            try
            {
                await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
            }
            catch (MessageSizeExceededException)
            {
                var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
                if (maxHeader > this.headerSizeEstimate)
                {
                    // If failed messages had bigger headers, remember this header size 
                    // as max observed and use it in future calculations
                    this.headerSizeEstimate = maxHeader;
                }
                else
                {
                    // Reduce max batch size to 95% of current value
                    this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
                }

                // Re-send the failed chunk
                await this.SendBigBatchAsync(packets.Select(p => p.Source));
            }

        }
    }
}

The code example is quite involved, here is what actually happens:

  1. Create a brokered message for each message object, but also save the corresponding source message. This is critical to be able to re-send items: there's no way to send the same BrokeredMessage instance twice.

  2. Also save the body size of the brokered message. We'll use it for retry calculation.

  3. Start with some guess of header size estimate. I start with 54 bytes, which seems to be the minimal header size possible.

  4. Split the batch into chunks the same way we did before.

  5. Try sending chunks one by one.

  6. If send operation fails with MessageSizeExceededException, iterate through failed items and find out the actual header size of the message.

  7. If that actual size is bigger than our known estimate, increase the estimate to the newly observed value. Retry sending the chunk (not the whole batch) with this new setting.

  8. If the header is small, but message size is still too big - reduce the allowed total size of the chunk. Retry again.

The combination of checks of steps 7 and 8 should make the mechanism reliable and self-adopting to message header payloads.

Since we reuse the sender between send operations, the size parameters will also converge quite quickly and no more retries will be needed. Thus the performance overhead should be minimal.

Conclusion

It seems like there is no "one size fits all" solution for this problem at the moment. The best implementation might depend on your messaging requirements.

But if you have the silver bullet solution, please leave a comment under this post and answer my StackOverflow question!

Otherwise, let's hope that the new .NET Standard-compatible Service Bus client will solve this issue for us. Track this github issue for status updates.

Finding Lost Events in Azure Application Insights

One of the ways we use Azure Application Insights is tracking custom application-specific events. For instance, every time a data point from an IoT device comes in, we log an AppInsights event. Then we are able to aggregate the data and plot charts to derive trends and detect possible anomalies.

And recently we found such anomaly, which looked like this:

Amount of Events on Dashboard Chart

This is a chart from our Azure dashboard, which shows the total amount of events of specific type received per day.

The first two "hills" are two weeks, so we can clearly see that we get more events on business days compared to weekends.

But then something happened on May 20: we started getting much less events, and the hill pattern disappeared, days looks much more alike.

We haven't noticed any other problems in the system, but the trend looked quite bothering. Are we loosing data?

I headed towards Analytics console of Application Insights to dig deeper. Here is the query that reproduces the problem:

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| project PointCount = todouble(customMeasurements["EventXReceived_Count"]), timestamp
| summarize EventXReceived = sum(PointCount) by bin(timestamp, 1d)
| render timechart

and I got the same chart as before:

Trend on Application Insights Analytics

I checked the history of our source code repository and deployments and I figured out that we upgraded the version of Application Insights SDK from version 2.1 to version 2.3.

My guess at this point was that Application Insights started sampling our data instead of sending all events to the server. After reading Sampling in Application Insights article, I came up with the following query to see the sampling rate:

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| summarize 100/avg(itemCount) by bin(timestamp, 1d) 
| render areachart

and the result is self-explanatory:

Sampling Rate

Clearly, the sampling rate dropped from 100% down to about 30% right when the anomaly started. The sampling-adjusted query (note itemCount multiplication)

customEvents
| where name == "EventXReceived"
| where timestamp >= ago(22d)
| project PointCount = todouble(customMeasurements["EventXReceived_Count"]) * itemCount, timestamp
| summarize EventXReceived = sum(PointCount) by bin(timestamp, 1d)
| render timechart

puts us back to the point when results make sense:

Adjusted Trend on Application Insights Analytics

The third week's Thursday was bank holiday in several European countries, so we got a drop there.

Should Azure dashboard items take sampling into account - to avoid confusing people and to show more useful charts?

Mikhail Shilkov I'm Mikhail Shilkov, a software developer. I enjoy F#, C#, Javascript and SQL development, reasoning about distributed systems, data processing pipelines, cloud and web apps. I blog about my experience on this website.

LinkedIn@mikhailshilkovGitHubStack Overflow