Sending Large Batches to Azure Service Bus
Azure Service Bus client supports sending messages in batches (SendBatch
and SendBatchAsync
methods of QueueClient
and TopicClient
). However,
the size of a single batch must stay below 256k bytes, otherwise the whole
batch will get rejected.
How do we make sure that the batch-to-be-sent is going to fit? The rest of this article will try to answer this seemingly simple question.
Problem Statement
Given a list of messages of arbitrary type T
, we want to send them to Service
Bus in batches. The amount of batches should be close to minimal, but
obviously each one of them must satisfy the restriction of 256k max size.
So, we want to implement a method with the following signature:
public Task SendBigBatchAsync<T>(IEnumerable<T> messages);
which would work for collections of any size.
To limit the scope, I will restrict the article to the following assumptions:
-
Each individual message is less than 256k serialized. If that wasn’t true, we’d have to put the body into external blob storage first, and then send the reference. It’s not directly related to the topic of discussion.
-
I’ll use
public BrokeredMessage(object serializableObject)
constructor. Custom serialization could be used, but again, it’s not related to batching, so I’ll ignore it. -
We won’t care about transactions, i.e. if connectivity dies in the middle of sending the big batch, we might end up with partially sent batch.
Messages of Known Size
Let’s start with a simple use case: the size of each message is known to us.
It’s defined by hypothetical Func<T, long> getSize
function. Here is a
helpful extension method that will split an arbitrary collection based on
a metric function and maximum chunk size:
public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
return source
.Aggregate(
new
{
Sum = 0L,
Current = (List<T>)null,
Result = new List<List<T>>()
},
(agg, item) =>
{
var value = metric(item);
if (agg.Current == null || agg.Sum + value > maxChunkSize)
{
var current = new List<T> { item };
agg.Result.Add(current);
return new { Sum = value, Current = current, agg.Result };
}
agg.Current.Add(item);
return new { Sum = agg.Sum + value, agg.Current, agg.Result };
})
.Result;
}
Now, the implementation of SendBigBatchAsync
is simple:
public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
await client.SendBatchAsync(brokeredMessages);
}
}
private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;
Note that I do await
for each chunk sequentially to preserve message ordering.
Another thing to notice is that we lost all-or-nothing guarantee: we might
be able to send the first chunk, and then get an exception from subsequent
parts. Some sort of retry mechanism is probably needed.
BrokeredMessage.Size
OK, how do we determine the size of each message? How do we implement
getSize
function?
BrokeredMessage
class exposes Size
property, so it might be tempting to
rewrite our method the following way:
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
await client.SendBatchAsync(chunk);
}
}
Unfortunately, this won’t work properly. A quote from documentation:
The value of Size is only accurate after the BrokeredMessage instance is sent or received.
My experiments show that Size
of a draft message returns the size of
the message body, ignoring headers. If the message bodies are large, and
each chunk has just a handful of them, the code might work ok-ish.
But it will significantly underestimate the size of large batches of messages with small payload.
So, for the rest of this article I’ll try to adjust the calculation for headers.
Fixed Header Size
It could be that the header size of each message is always the same. Quite often people will set the same headers for all their messages, or set no custom headers at all.
In this case, you might just measure this size once, and then put this fixed value inside a configuration file.
Here is how you measure the headers of a BrokeredMessage
message:
var sizeBefore = message.Size;
client.Send(message);
var sizeAfter = message.Size;
var headerSize = sizeAfter - sizeBefore;
Now you just need to adjust one line from the previous version of
SendBigBatchAsync
method
var chunks = brokeredMessages.ChunkBy(bm => FixedHeaderSize + bm.Size, MaxServiceBusMessage);
FixedHeaderSize
might be simply hard-coded, or taken from configuration
per application.
Measuring of Header Size per Message
If the size of headers varies per message, you need a way to adjust batching algorithm accordingly.
Unfortunately, I haven’t found a straightforward way to accomplish that. It looks like you’d have to serialize the headers yourself, and then measure the size of resulting binary. This is not a trivial operation to do correctly, and also implies some performance penalty.
Sean Feldman came up with a way to estimate the size of headers. That might be a good way to go, though the estimation tends to err on the safe side for messages with small payload.
Heuristics & Retry
The last possibility that I want to consider is actually allow yourself
violating the max size of the batch, but then handle the exception, retry
the send operation and adjust future calculations based on actual measured size
of the failed messages. The size is known after trying to SendBatch
, even if
operation failed, so we can use this information.
Here is a sketch of how to do that in code:
// Sender is reused across requests
public class BatchSender
{
private readonly QueueClient queueClient;
private long batchSizeLimit = 262000;
private long headerSizeEstimate = 54; // start with the smallest header possible
public BatchSender(QueueClient queueClient)
{
this.queueClient = queueClient;
}
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var packets = (from m in messages
let bm = new BrokeredMessage(m)
select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
foreach (var chunk in chunks)
{
try
{
await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
}
catch (MessageSizeExceededException)
{
var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
if (maxHeader > this.headerSizeEstimate)
{
// If failed messages had bigger headers, remember this header size
// as max observed and use it in future calculations
this.headerSizeEstimate = maxHeader;
}
else
{
// Reduce max batch size to 95% of current value
this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
}
// Re-send the failed chunk
await this.SendBigBatchAsync(packets.Select(p => p.Source));
}
}
}
}
The code example is quite involved, here is what actually happens:
-
Create a brokered message for each message object, but also save the corresponding source message. This is critical to be able to re-send items: there’s no way to send the same
BrokeredMessage
instance twice. -
Also save the body size of the brokered message. We’ll use it for retry calculation.
-
Start with some guess of header size estimate. I start with 54 bytes, which seems to be the minimal header size possible.
-
Split the batch into chunks the same way we did before.
-
Try sending chunks one by one.
-
If send operation fails with
MessageSizeExceededException
, iterate through failed items and find out the actual header size of the message. -
If that actual size is bigger than our known estimate, increase the estimate to the newly observed value. Retry sending the chunk (not the whole batch) with this new setting.
-
If the header is small, but message size is still too big - reduce the allowed total size of the chunk. Retry again.
The combination of checks of steps 7 and 8 should make the mechanism reliable and self-adopting to message header payloads.
Since we reuse the sender between send operations, the size parameters will also converge quite quickly and no more retries will be needed. Thus the performance overhead should be minimal.
Conclusion
It seems like there is no “one size fits all” solution for this problem at the moment. The best implementation might depend on your messaging requirements.
But if you have the silver bullet solution, please leave a comment under this post and answer my StackOverflow question!
Otherwise, let’s hope that the new .NET Standard-compatible Service Bus client will solve this issue for us. Track this github issue for status updates.