As a follow up to my previous post about encrypting messages for the Service Bus, I thought I would re-use the concepts but instead of encrypting the messages I would compress them.
As the Service bus has limitations on how big messages are allowed to be, compressing the message body is actually something that can be really helpful. Not that I think sending massive messages is the best thing in all cases, the 256kb limit can be a little low some times.
Anyhow… The basic idea is exactly the same as last time, no news there…but to be honest, I think this type of compressions should be there by default, or at least be available as a feature of BrokeredMessage by default… However, as it isn’t I will just make do with extension methods…
And if you haven’t read the previous post, here is a quick recap. THere is no way to inherit from BrokeredMessage as it is sealed. Instead, I have decided to create extension methods, which makes it really easy to use them. I have also decided to do two versions. one that relies on reflection, which might be considered bad practice by some, and one that creates a new BrokeredMessage with the compressed version of the body.
So, let’s get started. The extension methods I want to create look like this
public static void Compress(this BrokeredMessage msg)
public static void Decompress(this BrokeredMessage msg)
public static BrokeredMessage Compress<T>(this BrokeredMessage msg)
public static BrokeredMessage Decompress<T>(this BrokeredMessage msg)
Let’s start with the reflection based ones, which means the non-generic ones…
The first thing I need to do is get hold of the internal Stream that holds the serialized object used for the body. This is done through reflection as follows
var member = typeof(BrokeredMessage)
.GetProperty("BodyStream", BindingFlags.NonPublic | BindingFlags.Instance);
var dataStream = (Stream)member.GetValue(msg)
Next, I need to compress the serialized data, which I have decided to do using the GZipStream class from the framework.
This is really not rocket science, but it does include a little quirk. First you create a new Stream to hold the result, that is the compressed data. In my case, that is a MemoryStream, which is the reason for the quirk. If you write to disk, this isn’t a problem, but for MemoryStream it is… Anyhow, once I have a target Stream, I create a new GZipStream that wraps the target Stream.
The GZipStream behaves like a Stream, but when you read/write to it, it uses the underlying Stream for the data, and compresses/decompresses the data on the way in and out… Makes sense? I hope so… Whether it compresses or decompresses, which basically means whether you write or read from it, is defined by the second parameter to the constructor. That parameter is of type CompressionMode, which is an enum with 2 values, Compress and Decompress.
Once I have my GZipStream, I basically just copy my data from the source Stream into it, and then close it. And this is where the quirk I talked about before comes into play. The GZipStream won\t finish writing everything until it is closed. Flush does not help. So we need to close the GZipStream, which then also closes the underlying Stream. In most cases, this might nor be a problem, but for a MemoryStream, it actually is. Luckily, the MemoryStream will still allow me to read the data in it by using the ToArray() method. Using the returned byte[], I can create a new MemoryStream, which I can use to re-set the body of the BrokeredMessage.
And after all that talk, the actual code looks like this
var member = typeof(BrokeredMessage).GetProperty("BodyStream", BindingFlags.NonPublic | BindingFlags.Instance);
using (var dataStream = (Stream)member.GetValue(msg))
{
var compressedStream = new MemoryStream();
using (var compressionStream = new GZipStream(compressedStream, CompressionMode.Compress))
{
dataStream.CopyTo(compressionStream);
}
compressedStream = new MemoryStream(compressedStream.ToArray());
member.SetValue(msg, compressedStream);
}
Ok, so going in the other direction and decompressing the data is not much harder…
Once again, I pull out the Stream for the body using reflection. I then create a Stream to hold the decompressed data. After that, I wrap the body-stream in a GZipStream set to Decompress, and copy the contents of it to the MemoryStream. After closing the GZipStream, I Seek() the beginning of the MemoryStream before re-setting the BrokeredMessage’s Stream.
Like this…
var member = typeof(BrokeredMessage).GetProperty("BodyStream", BindingFlags.NonPublic | BindingFlags.Instance);
using (var dataStream = (Stream)member.GetValue(msg))
{
var decompressedStream = new MemoryStream();
using (var compressionStream = new GZipStream(dataStream, CompressionMode.Decompress))
{
compressionStream.CopyTo(decompressedStream);
}
decompressedStream.Seek(0, SeekOrigin.Begin);
member.SetValue(msg, decompressedStream);
}
Ok, that’s it! At least for the reflection based methods. Let’s have a look at the ones that don’t use reflection…
Just as in the previous post, I will be extracting the body as the type defined. I then use Json.NET to serialize the object. I then use GZipStream to compress the string.
So let’s take it step by step… First I read the body based on the genericly defined type. I then serialize it using Json.NET. Like this
var bodyObject = msg.GetBody<T>();
var json = JsonConvert.SerializeObject(bodyObject);
Once I have the Json, I create a MemoryStream to hold the data, wnd then use the same method as shown before to compress it. The main difference is that I don’t re-set the Stream on the BrokeredMessage, instead I convert the compressed data to a Base64 encoded string and use that as the body for a new message. Finally, I copy across all the properties for the message.
There is a tiny issue here in my download, which is also present in the download for the previous blog post. It only copies the custom properties, it does not include the built in ones… But hey, it is demo code… 
Ok, so it looks like this
BrokeredMessage returnMessage;
var bodyObject = msg.GetBody<T>();
var json = JsonConvert.SerializeObject(bodyObject);
using (var dataStream = new MemoryStream(Encoding.UTF8.GetBytes(json)))
{
var compressedStream = new MemoryStream();
using (var compressionStream = new GZipStream(compressedStream, CompressionMode.Compress))
{
dataStream.CopyTo(compressionStream);
}
returnMessage = new BrokeredMessage(Convert.ToBase64String(compressedStream.ToArray()));
}
CopyProperties(msg, returnMessage);
And the decompression is not much more complicated. It reads the body of the message as a string. It then gets the byte[] of the string by using the Convert.FromBase64String() method. Those bytes are then wrapped in a MemoryStream and Decompressed using GZipStream. The resulting byte[] is converted back to a string using Encoding.UTF8.GetString(), and then deserialized using Json.NET. The resulting object is set as the body for a new BrokeredMessage right before the rest of the properties are copied across.
BrokeredMessage returnMessage;
var body = msg.GetBody<string>();
var data = Convert.FromBase64String(body);
using (var dataStream = new MemoryStream(data))
{
var decompressedStream = new MemoryStream();
using (var compressionStream = new GZipStream(dataStream, CompressionMode.Decompress))
{
compressionStream.CopyTo(decompressedStream);
}
var json = Encoding.UTF8.GetString(decompressedStream.ToArray());
returnMessage = new BrokeredMessage(JsonConvert.DeserializeObject<T>(json));
}
CopyProperties(msg, returnMessage);
That’s it! That is all the code needed to compress and decompress BrokeredMessages.
To try it out, I created a little demo application. It uses a ridiculous class to break down text into paragraphs, sentences and words to generate a object to send across the wire.
I decided to create something quickly that would generate a largish object without too much work, and string manipulation seemed like a good idea, as I can just paste in lots of text to get a big object. It might not properly show the efficiency of the compression as text compresses very well, but it was easy to build. But do remember that different object will get different levels of compression…
The result of the compression is quite cool. Running it on a message that by default is 72,669 bytes, compresses it down to 3,359 bytes. Once again, it is a lot of text and so on, but to be fairs, the data before compression is binary XML, which is also text…
So the conclusion is that compressing the messages isn’t that hard, but you will potentially gain a lot form doing it. Not only for when your messages break the 256kb limit, but also for smaller messages as they will be even smaller and thus faster across the network…
As usual, there is source code for download. It is available here: DarksideCookie.Azure.Sb.Compression.zip (216.65 kb)
That’s it for this time… Cheers!