Reliable Consumer of Azure Event Hubs

Azure Event Hubs is a log-based messaging system-as-a-service in Azure cloud. It's designed to be able to handle huge amount of data, and naturally supports multiple consumers.

Event Hubs and Service Bus

While Event Hubs are formally part of Azure Service Bus family of products, in fact its model is quite different.

"Traditional" Service Bus service is organized around queues (subscriptions are just queues with the topic being the source of messages). Each consumer can peek messages from the queue, do the required processing and then complete the message to remove it from the queue, or abort the processing. Abortion will leave the message at the queue, or will move it to the Dead Letter Queue. Completion/abortion are granular per message; and the status of each message is managed by the Service Bus broker.

Service Bus Processors

Event Hubs service is different. Each Hub represnts a log of messages. Event producer appends data to the end of the log, and consumers can read this log, but they can't remove or change the status of events there.

Each event has an offset associated with it. And the only operation that is supported for consumers is "give me some messages starting at the offset X".

Event Hub Processors

While this approach might seem simplistic, it actually makes consumers more powerful:

  • The messages do not disappear from the Hub after being processed for the first time. So, if needed, the consumer can go back and re-process older events again;

  • Multiple consumers are always supported, out of the box. They just read the same log, after all;

  • Each consumer can go at its own pace, drop and resume processing whenever needed, with no effect on other consumers.

There are some disadvantages too:

  • Consumers have to manage their own state of the processing progress, i.e. they have to save the offset of the last processed event;

  • There is no way to mark any specific event as failed to be able to reprocess it later. There's no notion of Dead Letter Queue either.

Event Processor Host

To overcome the first complication, Microsoft provides the consumer API called EventProcessorHost. This API has an implementation of consumers based on checkpointing. All you need to do is to provide a callback to process a batch of events, and then call CheckpointAsync method, which saves the current offset of the last message into Azure Blob Storage. If the consumer restarts at any point in time, it will read the last checkpoint to find the current offset, and will then continue processing from that point on.

It works great for some scenarios, but the event delivery/processing guarantees are relatively low in this case:

  • Any failures are ignored: there's no retry or Dead Letter Queue

  • There are no transactions between event hub checkpoints and the data sinks that the processor works with (i.e. data stores where processed messages end up at)

In this post I want to focus on a way to process events with higher consistency requirements, in particular:

  • Event Hub processor modifies data in a SQL Database, and such processing is transactional per batch of messages

  • Each event should be (successfully) processed exactly once

  • If event processing failed, it should be marked as failed and kept available to be reprocessed at later point in time

While end-to-end exactly-once processing would require changes of the producers too, we will only focus on consumer side in this post.

Transactional Checkpoints in SQL

If checkpoint information is stored in Azure Blobs, there is no obvious way to implement distributed transactions between SQL Database and Azure Storage.

However, we can override the default checkpointing mechanism and implement our own checkpoints based on a SQL table. This way each checkpoint update can become part of a SQL transaction and be committed or rolled back with normal guarantees provided by SQL Server.

Here is a table that I created to hold my checkpoints:

CREATE TABLE EventHubCheckpoint (
  Topic varchar(100) NOT NULL,
  PartitionID varchar(100) NOT NULL,
  SequenceNumber bigint NOT NULL,
  Offset varchar(20) NOT NULL,
  CONSTRAINT PK_EventHubCheckpoint PRIMARY KEY CLUSTERED (Topic, PartitionID)
)

For each topic and partition of Event Hubs, we store two values: sequence number and offset, which together uniquely identify the consumer position.

Conveniently, Event Host Processor provides an extensibility point to override the default checkpoint manager with a custom one. For that we need to implement ICheckpointManager interface to work with our SQL table.

The implementation mainly consists of 3 methods: CreateCheckpointIfNotExistsAsync, GetCheckpointAsync and UpdateCheckpointAsync. The names are pretty much self-explanatory, and my Dapper-based implementation is quite trivial. You can find the code here.

For now, I'm ignoring the related topic of lease management and corresponding interface ILeaseManager. It's quite a subject on its own; for the sake of simplicity I'll assume we have just one consumer process per partition, which makes proper lease manager redundand.

Dead Letter Queue

Now, we want to be able to mark some messages as failed and to re-process them later. To make Dead Letters transactional, we need another SQL table to hold the failed events:

CREATE TABLE EventHubDeadLetter (
  Topic varchar(100) NOT NULL,
  PartitionID varchar(100) NOT NULL,
  SequenceNumber bigint NOT NULL,
  Offset varchar(20) NOT NULL,
  FailedAt datetime NOT NULL,
  Error nvarchar(max) NOT NULL,
  CONSTRAINT PK_EventHubDeadLetter PRIMARY KEY CLUSTERED (Topic, PartitionID)
)

This table looks very similar to EventHubCheckpoint that I defined above. That is because they are effectively storing pointers to events in a hub. Dead Letters have two additional columns to store error timestamp and text.

There is no need to store the message content, because failed events still sit in the event hub anyway. You could still log it for diagnostics purpose - just make an extra varbinary column.

There's no notion of dead letters in Event Hubs SDK, so I defined my own interface IDeadLetterManager with a single AddFailedEvents method:

public interface IDeadLetterManager
{
    Task AddFailedEvents(IEnumerable<DeadLetter<EventData>> deadLetters);
}

public class DeadLetter<T>
{
    public T Data { get; set; }
    public DateTime FailureTime { get; set; }
    public Exception Exception { get; set; }
}

Dapper-based implementation is trivial again, you can find the code here.

Putting It Together: Event Host

My final solution is still using EventHostProcessor. I pass SQLCheckpointManager into its constructor, and then I implement IEventProcessor's ProcessEventsAsync method in the following way:

  1. Instantiate a list of items to store failed events
  2. Start a SQL transaction
  3. Loop through all the received events in the batch
  4. Process each item inside a try-catch block
  5. If exception happens, add the current event to the list of failed events
  6. After all items are processed, save failed events to Dead Letter table
  7. Update the checkpoint pointer
  8. Commit the transaction

The code block that illustrates this workflow:

public async Task ProcessEventsAsync(
    PartitionContext context, 
    IEnumerable<EventData> eventDatas)
{
    // 1. Instantiate a list of items to store failed events
    var failedItems = new List<DeadLetter<EventData>>();

    // 2. Start a SQL transaction
    using (var scope = new TransactionScope())
    {
        // 3. Loop through all the received events in the batch
        foreach (var eventData in eventDatas)
        {
            try
            {
                // 4. Process each item inside a try-catch block
                var item = this.Deserialize(eventData);
                await this.DoWork(item);
            }
            catch (Exception ex)
            {
                // 5. Add a failed event to the list
                failedItems.Add(new DeadLetter<EventData>(eventData, DateTime.UtcNow, ex));
            }
        }

        if (failedItems.Any())
        {
            // 6. Save failed items to Dead Letter table
            await this.dlq.AddFailedEvents(failedItems);
        }

        // 7. Update the checkpoint pointer
        await context.CheckpointAsync();

        // 8. Commit the transaction
        scope.Complete();
    }
}

Conclusion

My implementation of Event Hubs consumer consists of 3 parts: checkpoint manager that saves processing progress per partition into a SQL table; dead letter manager that persists information about processing errors; and an event host which uses both to provide transactional processing of events.

The transaction scope is limited to SQL Server databases, but it might be sufficient for many real world scenarios.

Why F# and Functional Programming Talk at .NET Development Nederland Meetup

On May 8th 2017 I gave a talk at the .NET Development Nederland group in Amsterdam.

Here are the slides for the people who were there and want to revisit the covered topics.

Why Learn F# and Functional Programming

Link to full-screen HTML slides: Why Learn F# and Functional Programming

Slides on SlideShare:

Useful links:

F# for Fun and Profit

F# Foundation

Real-World Functional Programming, With examples in F# and C# Book

Thanks for attending my talk! Feel free to post any feedback in the comments.

Visualizing Dependency Tree from DI Container

So you are a C# developer. And you need to read the code and understand its structure. Maybe you've just joined the project, or it's your own code you wrote 1 year ago. In any case, reading code is hard.

Luckily, some good thought was applied to this particular piece of code. It's all broken down into small classes (they might even be SOLID!), and all the dependencies are injected via constructors. It looks like it's your code indeed.

So, you figured out that the entry point for your current use case is the class called ThingService. It's probably doing something with Thing's and that's what you need. The signature of the class constructor looks like this:

public ThingService(
    IGetThings readRepository,
    ISaveThing saveRepository,
    IParseAndValidateExcel<Thing, string> fileParser,
    IThingChangeDetector thingChangeDetector,
    IMap<Thing, ThingDTO> thingToDtoMapper,
    IMap<int, ThingDTO, Thing> dtoToThingMapper)

OK, so we clearly have 6 dependencies here, and they are all interfaces. We don't know where those interfaces are implemented, but hey - we've got the best tooling in the industry, so right click on IGetThings, then Go To Implementation.

public DapperThingRepository(
    ICRUDAdapter adapter,
    IDatabaseConnectionFactory connectionFactory,
    IMap<Thing, ThingRow> thingRowMapper,
    IMap<ThingRow, Thing> thingMapper)

Now we know that we get Thing from Dapper, so probably from a SQL database. Let's go one level deeper and check where those Mappers are implemented. Right click, Go To Implementation... But instead of navigating to another code file you see

Find Symbol Result - 28 matches found

Oh, right, looks like we use IMap<T, U> in more places. OK, we'll find the right one later, let's first check the connection factory... Right click, Go To Implementation. Nah:

The symbol has no implementation

What? But the application works! Ah, IDatabaseConnectionFactory comes from an internal library, so most probably the implementation is also inside that library.

Clearly, navigation doesn't go that well so far.

Dependency Graph

When code reading gets tricky, usually an image can boost the understanding. The picture below actually shows the graph of class dependencies from our example:

Class Dependency Graph

Each node is a class, each arrow is a dependency - an interface injected into the constructor.

Just by looking at the picture for a minute of two you can start seeing some structure, and get at least the high-level opinion about the application complexity and class relationships.

Picture is also a great way of communication. Once you understand the structure, you can explain it to a colleague much easier with boxes and lines on the screen in addition to a plain wall of code.

You can enrich such picture with comments at the time of writing and leave it to your future self or anyone who would read the code in 2 years time.

But now the question is - what's the easiest way to draw such dependency graph?

DI Container

The assumption of this post is that a dependency injection (DI) container of some kind is used in the project. If so, chances are that you can get such dependency graph from the container registrations.

My example is based on Simple Injector DI container which is used by ourselves. So, further on I will explain how to draw a dependency graph from Simple Injector container.

My guess is that any mature DI library will provide you with such possibility, mostly because the dependency graphs are built internally by any container during its normal operations.

Implementation

The implementation idea of dependency graph visualization is quite simple, as the biggest chunk of work is done by Simple Injector itself. Here are the steps:

  1. Run all your DI registrations as you do in the actual application. This will initialize Container to the desired state.

  2. Define which class should be the root of the dependency tree under study. You can refine later, but you need to start somewhere.

  3. Call GetRegistration method of DI container for the selected type. An instance of InstanceProducer type is returned.

  4. Call GetRelationships method of the instance producer to retrieve all interface/class pairs that the given type depends on. Save each relation into your output list.

  5. Navigate through each dependency recursively to load further layers of the graph. Basically, do the depth-first search and save all found relations.

  6. Convert the list of found relations into GraphViz textual graph description.

  7. Use a tool like WebGraphviz do the actual visualization by converting text to picture.

There are several potential pitfalls on the way, like cyclic graphs, decorator registrations etc. To help you avoid those I've created a small library to automate steps 3 to 6 from the list above. See my SimpleInjector.Visualization github repo and let me know if you find it useful.

Conclusion

People are good at making sense of visual representations - use that skill to improve understanding and communication within your development team.

Dependency injection practice requires a lot of ceremony to set it up and running. Leverage this work for the best: check what kind of insights you can get from that setup. Dependency graph visualization is one example of such leverage, but there might be other gems in there.

Just keep searching!

Azure Functions as a Facade for Azure Monitoring

Azure Functions are the Function-as-a-Service offering from Microsoft Azure cloud. Basically, an Azure Function is a piece of code which gets executed by Azure every time an event of some kind happens. The environment manages deployment, event triggers and scaling for you. This approach is often reffered as Serverless.

In this post I will describe one use case for Azure Functions: we implemented a number of functions as a proxy layer between our operations/monitoring tool and Azure metric APIs.

Problem

Automated monitoring and alerting are crucial in order to ensure 24x7 smooth operations of our business-critical applications. We host applications both on-premise and in Azure cloud, and we use a single set of tools for monitoring across this hybrid environment.

Particularly, we use PRTG Network Monitor to collect all kinds of metrics about the health of our systems and produce both real-time alerts and historic trends.

A unit of monitoring in PRTG is called "sensor". Each sensor polls a specific data source to retrieve the current value of a metric. The data source can be a performance counter, a JSON value in HTTP response, a SQL query result and so on.

The problem is that there is no PRTG sensor for Azure metrics out of the box. It might be possible to implement a sensor with custom code, e.g. in PowerShell, but it would be problematic in two ways (at least):

  1. The custom code sensors are cumbersome to develop and maintain.
  2. We would have to put sensitive information like Azure API keys and connection strings to PRTG.

Solution Overview

To overcome these problems we introduced an intermediate layer, as shown on the following picture:

PRTG to HTTP to Azure

We use PRTG HTTP XML/REST sensor type. This sensor polls a given HTTP endpoint, parses the response as JSON and finds a predefined field. This field is then used as the sensor value. It takes 30 seconds to setup such sensor in PRTG.

The HTTP endpoint is hosted inside Azure. It provides a facade for metric data access. All the sensitive information needed to access Azure metrics API is stored inside Azure configuration itself. The implementation knows which Azure API to use to get a specific metric, and it hides those complications from the client code.

Azure Functions

We chose Azure Functions as the technology to implement and host such HTTP facade.

The functions are very easy to create or modify. They are deployed independently from any other code, so we can update them at any cadence. And no need to provision any kind of servers anywhere - Azure will run the code for us.

Here is how the whole setup works:

Retrieval of data from Azure to PRTG

  1. Every X minutes (configured per sensor), PRTG makes an HTTP request to a predefined URL. The request includes an Access Key as a query parameter (the key is stored in sensor URL configuration). Each access key enables access to just one endpoint and is easily revokable.

  2. For each Metric type there is an Azure Function listening for HTTP requests from PRTG. Azure authorizes requests that contain valid access keys.

  3. Based on query parameters of the request, Azure Function retrieves a proper metric value from Azure management API. Depending on the metric type, this is accomplished with Azure .NET SDK or by sending a raw HTTP request to Azure REST API.

  4. Azure Function parses the response from Azure API and converts it to just the value which is requested by PRTG.

  5. The function returns a simple JSON object as HTTP response body. PRTG parses JSON, extracts the numeric value, and saves it into the sensor history.

At the time of writing, we have 13 sensors served by 5 Azure Functions:

Map of PRTG sensors to Functions to Azure services

I describe several functions below.

Service Bus Queue Size

The easiest function to implement is the one which gets the amount of messages in the backlog of a given Azure Service Bus queue. The function.json file configures input and output HTTP bindings, including two parameters to derive from the URL: account (namespace) and queue name:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "route": "Queue/{account}/{name}"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ],
  "disabled": false
}

The C# implementation uses standard Service Bus API and a connection string from App Service configuration to retrieve the required data. And then returns a dynamic object, which will be converted to JSON by Function App runtime.

#r "Microsoft.ServiceBus"

using System.Net;
using Microsoft.ServiceBus;

public static object Run(HttpRequestMessage req, string account, string name)
{
    var connectionString = Environment.GetEnvironmentVariable("sb-" + account);
    var nsmgr = NamespaceManager.CreateFromConnectionString(connectionString);
    var queue = nsmgr.GetQueue(name);
    return new 
    {
        messageCount = queue.MessageCountDetails.ActiveMessageCount,
        dlq = queue.MessageCountDetails.DeadLetterMessageCount
    };
}

And that is all the code required to start monitoring the queues!

Service Bus Queue Statistics

In addition to queue backlog and dead letter queue size, we wanted to see some queue statistics like amount of incoming and outgoing messages per period of time. The corresponding API exists, but it's not that straightforward, so I described the whole approach in a separate post: Azure Service Bus Entity Metrics .NET APIs.

In my Azure Function I'm using the NuGet package that I mentioned in the post. This is accomplished by adding a project.json file:

{
  "frameworks": {
    "net46":{
      "dependencies": {
        "MikhailIo.ServiceBusEntityMetrics": "0.1.2"
      }
    }
   }
}

The function.json file is similar to the previous one, but with one added parameter called metric. I won't repeat the whole file here.

The Function implementation loads a certificate from the store, calls metric API and returns the last metric value available:

using System.Linq;
using System.Security.Cryptography.X509Certificates;
using MikhailIo.ServiceBusEntityMetrics;

public static DataPoint Run(HttpRequestMessage req, string account, string name, string metric)
{
    var subscription = Environment.GetEnvironmentVariable("SubscriptionID");
    var thumbprint = Environment.GetEnvironmentVariable("WEBSITE_LOAD_CERTIFICATES");

    X509Store certStore = new X509Store(StoreName.My, StoreLocation.CurrentUser);
    certStore.Open(OpenFlags.ReadOnly);

    X509Certificate2Collection certCollection = certStore.Certificates.Find(
        X509FindType.FindByThumbprint,
        thumbprint,
        false);

    var client = new QueueStatistics(certCollection[0], subscription, account, name);
    var metrics = client.GetMetricSince(metric, DateTime.UtcNow.AddMinutes(-30));
    return metrics.LastOrDefault();
}

Don't forget to set WEBSITE_LOAD_CERTIFICATES setting to your certificate thumbprint, otherwise Function App won't load it.

Web App Instance Count

We are using Azure Web Jobs to run background data processing, e.g. for all queue message handlers. The jobs are hosted in Web Apps, and have auto-scaling enabled. When the load on the system grows, Azure spins up additional instances to increase the overall throughput.

So, the next metric to be monitored is the amount of Web App instances running.

There is a REST endpoint to retrieve this information, but this time authentication and authorization are implemented with Active Directory. I created a helper class to wrap the authentication logic:

public static class RestClient
{
    public static async Task<T> Query<T>(string url)
    {
        var token = await GetAuthorizationHeader();
        var client = new HttpClient();
        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);

        var response = await client.GetAsync(url);
        var content = await response.Content.ReadAsStringAsync();
        return JsonConvert.DeserializeObject<T>(content);
    }

    private static async Task<string> GetAuthorizationHeader()
    {
        var activeDirectoryID = Environment.GetEnvironmentVariable("ActiveDirectoryID");
        var applicationID = Environment.GetEnvironmentVariable("ActiveDirectoryApplicationID");
        var secret = Environment.GetEnvironmentVariable("ActiveDirectorySecret");

        var context = new AuthenticationContext($"https://login.windows.net/{activeDirectoryID}");
        var credential = new ClientCredential(applicationID, secret);
        AuthenticationResult result = 
            await context.AcquireTokenAsync("https://management.core.windows.net/", credential);
        return result.AccessToken;
    }
}

The function then uses this REST client to query Web App management API, converts JSON to strongly typed C# objects and extracts the amount of instances into HTTP response:

public class Instance
{
    public string id { get; set; }
    public string name { get; set; }
}

public class Response
{
    public Instance[] value { get; set; }
}

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req)
{
    var subscription = Environment.GetEnvironmentVariable("SubscriptionID");
    var resourceGroup = Environment.GetEnvironmentVariable("ResourceGroup");
    var appService = Environment.GetEnvironmentVariable("AppService");

    var url = $"https://management.azure.com/subscriptions/{subscription}/resourceGroups/{resourceGroup}" +
              $"/providers/Microsoft.Web/sites/{appService}/instances?api-version=2015-08-01";
    var response = await RestClient.Query<Response>(url);

    return req.CreateResponse(HttpStatusCode.OK, new
    {
        instanceCount = response.value.Length
    });
}

Please follow this walkthrough to setup your application in Active Directory, assign required permissions and get the proper keys.

Azure Health

Azure has a service which reports the health of different services at any given moment, as acknowledged by Microsoft.

The handy part is that you can provide your subscription ID and then only services used by that subscription will be reported.

The exact usage of health service may depend on your use case, but the following example shows how to retrieve the basic counts of services per reported status.

public class ResourceProperties
{
    public string availabilityState { get; set; }
    public string summary { get; set; }
    public string detailedStatus { get; set; }
    public string reasonType { get; set; }
    public string occuredTime { get; set; }
    public string reasonChronicity { get; set; }
    public string reportedTime { get; set; }
}
public class Resource
{
    public string id { get; set; }
    public ResourceProperties properties { get; set; }
}

public class Response
{
    public Resource[] value { get; set; }
}

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req)
{
    var subscription = Environment.GetEnvironmentVariable("SubscriptionID");

    var url = $"https://management.azure.com/subscriptions/{subscription}/providers/Microsoft.ResourceHealth/availabilityStatuses?api-version=2015-01-01";
    var r = await RestClient.Query<Response>(url);
    var available = r.value
        .Where(v => v.properties.availabilityState == "Available")
        .Count();

    var unknown = r.value
        .Where(v => v.properties.availabilityState == "Unknown")
        .Count();

    var other = r.value.Length - available - unknown;

    return req.CreateResponse(HttpStatusCode.OK, new
    {
        available = available,
        unknown = unknown,
        other = other,
        details = r.value
    });
}

Users Online

The last example I want to share is related to Application Insights data. For instance, we inject a small tracking snippet on our front-end page and then Application Insights track all the page views and other user activity.

We use the amount of users currently online as another metric for the monitoring solution. The Application Insights API is currently in preview, but at least it is nicely described at dev.applicationinsights.io. Be sure to check out API Explorer too.

The following sample function returns the amount of users online:

public class UsersCount
{
    public long unique { get; set; }
}

public class Value
{
    [JsonProperty("users/count")]
    public UsersCount UsersCount { get; set; }
}

public class Response
{
    public Value value { get; set; }
}

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req)
{
    var appID = Environment.GetEnvironmentVariable("ApplicationInsightsID");
    var key = Environment.GetEnvironmentVariable("ApplicationInsightsKey");

    var client = new HttpClient();
    client.DefaultRequestHeaders.Add("x-api-key", key);
    var url = $"https://api.applicationinsights.io/beta/apps/{appID}/metrics/users/count";

    var response = await client.GetAsync(url);
    var content = await response.Content.ReadAsStringAsync();
    var r = JsonConvert.DeserializeObject<Response>(content);

    return req.CreateResponse(HttpStatusCode.OK, new
    {
        usersCount = r.value.UsersCount.unique
    });
}

Combine Several Metrics in One Sensor

Thanks to suggestion from Luciano Lingnau, we have migrated our PRTG sensors to HTTP Data Advanced. This sensor type allows bundling several related metrics into one sensor with multiple channels. PRTG is then able to display all those channels on the single chart.

For instance, we use the following channels for Service Bus related sensors:

  • Active message count
  • Age of the oldest message sitting inside the queue
  • Dead letter message count
  • Incoming messages per 5 minutes
  • Outgoing messages per 5 minutes
  • Scheduled message count

For each channel, we define units of measure, warning and error thresholds.

HTTP Data Advanced expects a URL which returns JSON of the predefined format. Here is a sample C# code to create a dynamic object which is then converted to the proper JSON:

return new
{
    prtg = new
    {
        result = new[]
        {
            new
            {
                channel = "ActiveMessageCount",
                value = messageCountDetails.ActiveMessageCount,
                unit = "Count",
                customunit = (string)null,
                limitmaxwarning = (int?)null,
                limitmode = 0
            },
            new
            {
                channel = "DeadLetterMessageCount",
                value = messageCountDetails.DeadLetterMessageCount,
                unit = "Count",
                customunit = (string)null,
                limitmaxwarning = (int?)0,
                limitmode = 1
            },
            new
            {
                channel = "OutgoingMessageCount",
                value = outgoing,
                unit = "custom",
                customunit = "#/5min",
                limitmaxwarning = (int?)null,
                limitmode = 0
            },
            new
            {
                channel = "IncommingMessageCount",
                value = incoming,
                unit = "custom",
                customunit = "#/5min",
                limitmaxwarning = (int?)null,
                limitmode = 0
            },
            new
            {
                channel = "ScheduledMessageCount",
                value = messageCountDetails.ScheduledMessageCount,
                unit = "Count",
                customunit = (string)null,
                limitmaxwarning = (int?)null,
                limitmode = 0
            },
            new
            {
                channel = "Age",
                value = age,
                unit = "TimeSeconds",
                customunit = (string)null,
                limitmaxwarning = (int?)null,
                limitmode = 0
            } 
        }
    }
};

Conclusion

It seems that monitoring metrics retrieval is an ideal scenario to start using Azure Functions. The Functions are very easy to create and modify, they abstract away the details of hosting Web API endpoints, and at the same time give you the full power of C# (or F#) and Azure.

And because we only call those functions about 1 time per minute, they are free to run!

Azure Service Bus Entity Metrics .NET APIs

Azure Service Bus is a key component of many background processing applications hosted in Azure, so it definitely requires monitoring and alerting. My goal for our monitoring solution was to provide an API to retrieve the following parameters for each Service Bus queue/topic in our application:

  • Message count (backlog)
  • Dead letter queue count
  • Amount of Incoming messages per time period
  • Amount of Processed messages per time period

The first two are easily retrieved from QueueDescription object (see MSDN):

var nsmgr = NamespaceManager.CreateFromConnectionString(connectionString);
var queue = nsmgr.GetQueue(name);
var backlog = queue.MessageCountDetails.ActiveMessageCount;
var dlq = q.MessageCountDetails.DeadLetterMessageCount;

The other two metrics are not readily available from the .NET SDK though. There are some extra metrics described in Service Bus Entity Metrics REST APIs but the docs are really brief, wague and lack any examples.

So the rest of this post will be a walkthrough of how to consume those REST API from your .NET code.

Management Certificate

The API authenticates the caller by its client certificate. This authentication approach seems to be deprecated for Azure services, but for this particular API it's still the way to go.

First, you need to obtain a certificate itself, which means:

  • It's installed in certificate store on the machine where API call is made
  • You have a .cer file for it

If you are calling API from your workstation, you may just Create a new self-signed certificate.

I am calling API from Azure Function App, so I reused the certificate that we already uploaded to Azure for SSL support.

Once you have the certificate, you have to Upload it as a management certificate to "Classic" Azure portal. Yes, management certificates are not supported by the the new portal. If you don't have access to the old portal, ask your system administrator to grant it.

Finally, here is a code sample to load the certificate in C# code:

X509Store store = new X509Store("My", StoreLocation.CurrentUser);
store.Open(OpenFlags.ReadOnly);
var cert = store.Certificates.Find(
    X509FindType.FindBySubjectName, 
    "<certificate name of yours>", 
    false)[0];

Request Headers

Here is a helper class which adds the specified certificate to each request and sets the appropriate headers too:

internal class AzureManagementClient : WebClient
{
    private readonly X509Certificate2 certificate;

    public AzureManagementClient(X509Certificate2 certificate)
    {
        this.certificate = certificate;
    }

    protected override WebRequest GetWebRequest(Uri address)
    {
        var request = (HttpWebRequest)base.GetWebRequest(address);

        request.ClientCertificates.Add(this.certificate);
        request.Headers.Add("x-ms-version: 2013-10-01");
        request.Accept = "application/json";

        return request;
    }
}

This code is mostly copied from the very useful post of Brian Starr, so thank you Brian.

Getting the List of Metrics

To get the list of available metrics you will need 3 string parameters:

  • Azure subscription ID
  • Service Bus namespace
  • Queue name

The following picture shows all of them on Azure Portal screen:

Service Bus Parameters

Now, format the following request URL and query it using our azure client:

var client = new AzureManagementClient(cert);
var url = $"https://management.core.windows.net/{subscriptionId}" +
          $"/services/servicebus/namespaces/{serviceBusNamespace}" +
          $"/queues/{queueName}/Metrics";
var result = client.DownloadString(url);

If you did everything correctly, you will get the list of supported metrics in JSON. Congratulations, that's a major accomplishment :)

And here is a quick way to convert JSON to C# array:

public class Metric
{
    public string Name { get; set; }
    public string Unit { get; set; }
    public string PrimaryAggregation { get; set; }
    public string DisplayName { get; set; }
}
var metrics = JsonConvert.DeserializeObject<Metric[]>(result);

Getting the Metric Values

Now, to get the metric values themselves, you will need some extra parameters:

  • Metric name (take a value of Name properties from Metric class above)
  • Rollup period, or aggregation period: 5 minute, 1 hour, 1 day, or 1 week, take the Pxxx code from here
  • Start date/time (UTC) of the data period to query

Here is the sample code:

var time = DateTime.UtcNow.AddHours(-1).ToString("s");

var client = new AzureManagementClient(cert);
var url = $"https://management.core.windows.net/{subscriptionId}" +
          $"/services/servicebus/namespaces/{serviceBusNamespace}" +
          $"/queues/{queueName}/Metrics/{metric}" +
          $"/Rollups/PT5M/Values?$filter=Timestamp%20ge%20datetime'{time}Z'";

var result = client.DownloadString(url);

I am using incoming metric to get the amount of enqueued messages per period and outgoing metric to get the amount of dequeued messages.

The strongly typed version is simple:

public class DataPoint
{
    public string Timestamp { get; set; }
    public long Total { get; set; }
}
var data = JsonConvert.DeserializeObject<DataPoint[]>(result);

Working Example

I've authored a small library which wraps the HTTP request into strongly typed .NET classes. You can see it in my github repository or grab it from NuGet.

Mikhail Shilkov I'm Mikhail Shilkov, a software developer. I enjoy F#, C#, Javascript and SQL development, reasoning about distributed systems, data processing pipelines, cloud and web apps. I blog about my experience on this website.

LinkedIn@mikhailshilkovGitHubStack Overflow