I guess it is time for another look at the Azure Service Bus. My previous posts about it has covered the basics, message relaying and relaying REST. So I guess it is time to step away from the relaying and look at the other way you can work with the service bus.
When I say “the other” way, it doesn’t mean that we are actually stepping away from relaying. All messages are still relayed via the bus, but in “the other” case, we utilize the man in the middle a bit more.
“The other” way means utilizing the message bus for “storage” as well. It means that we send a message to the bus, let the bus store it for us until the service feels like picking it up and handling it.
There are several ways that this can be utilized, but in this post, I will focus on queuing.
The sample in this post requires the same things as the previous posts, that is the Azure SDK and so on. If this is the first post you are reading from me regarding the bus, I suggest going back to the first code sample and look at the prerequisites.
Azure Service Bus queues are very similar to the normal Azure Storage queues, as well as any other queue. The biggest difference in the Azure scenario is that with the service bus version, we get some things for free. We don’t have to build our own polling scheme for example, as the queues support long polling from the start.
But before we start looking at code, I want to cover a basic thing, the message. When sending a message to the queue, the message is always of the type BrokeredMessage. A brokered message has a bunch of properties and features which are outside of the scope of this blog post, but some of the main features are the possibility of sending a typed message, meta data/properties for that message, control time to live (TTL) etc.
So the basic flow for using a queue on the service side is – get a reference to the queue, listen for messages on the queue, handle the incoming message. Just as you would expect…
There are a couple of things to remember though. First of all, the queue might not exist already. So make sure it is there! And also, it is a queue, it has its own storage, so when you start listening, you might get flooded by messages straight away!
Ok, so what does it look like on the client? Well, pretty much the same, get a reference to the queue, send the required messages, and then close it… And the same gotchas exists!
So let’s look at some code then…
First we need a message to send. And by that, I mean a typed message to put inside the BrokeredMessage that we send to the queue. This message however is ridiculously easy to create, just create a new class and define what properties you need. However, you need to put it in a separate assembly that both the service and the client can reference.
I start of by creating a new class library project called something good that reflects that it will contain messages for the queue. Inside this project, I create a class called MyMessage.
The MyMessage class gets a single property of the type string. This is just to keep it simple. It could contain a lot more information. Just remember that it is not allowed to be bigger than 256 kB.
public class MyMessage
{
public MyMessage()
{
}
public MyMessage(string msg)
{
Message = msg;
}
public string Message { get; set; }
}
Next up is the client, so I create a new console application project called QueueClient and add a reference to the Microsoft.ServiceBus, System.Runtime.Serialization and System.Configuration assemblies, as well as my messages project. (Remember to change the target to the full .NET Framework)
Inside of the Program.cs file, I add a new global member of type string, called QueueName. I also make is static. I also add the get-methods for the configuration that I have used before.
class Program
{
private static string QueueName = "TheQueue";
static void Main(string[] args)
{
}
private static string GetIssuerName()
{
return ConfigurationManager.AppSettings["issuer"];
}
private static string GetSecret()
{
return ConfigurationManager.AppSettings["secret"];
}
private static string GetNamespace()
{
return ConfigurationManager.AppSettings["namespace"];
}
}
To back this up, I obviously need to add the necessary appSettings elements in the app.config file as well. These are however identical to the previous posts, so I won’t show these. But even if you haven’t read the previous posts, it should be fairly obvious what needs to be there…
Next up is the actual functionality… I have to start by getting hold of the URL to the service bus namespace. This is really easy to do using a static class called ServiceBusEnvironment. On this class, you will find a couple of methods and a couple of properties. What I want is the CreateServiceUri() method. It takes the scheme to use, the namespace and the path to the service. In this case, the scheme is “sb” for Service Bus, the namespace comes from configurations, and there is no service name. So it looks like this
var url = ServiceBusEnvironment.CreateServiceUri("sb", GetNamespace(), string.Empty);
I also need to provide the service bus with some credentials. This is done using shared secret token provider, which is not at all as complicated to create as it sounds. Just use another static method on a class provided by Microsoft called TokenProvider. The method is called CreateSharedSecretTokenProvider(), which is really descriptive! It takes an issuer name and a secret. Both of these come from configuration, so it simply looks like this
var credentials = TokenProvider.CreateSharedSecretTokenProvider(GetIssuerName(), GetSecret());
Next I need a NamespaceManager, which is newed up, by passing in the URL and the taken provider. I also need a MessagingFactory, which is created by passing in the same values to a static Create() method on the MessagingFactory class.
var nsm = new NamespaceManager(url, credentials);
var mf = MessagingFactory.Create(url, credentials);
Now that we have all the classes we need, it is time to do some actual work! …I know, that is a lot of things to create before we can even begin…
First out is to verify that the queue exists. Luckily, the NamespaceManager can help me with this. It has a method called QueueExists(), which takes the name of the queue and returns bool.
If the queue doesn’t exist, I create it using the CreateQueue() method on the NamespaceManager. See, the NamespaceManager that we created is really useful. It helps us with managing the “features” in our namespace (not only for queues).
And finally, I use the MessagingFactory to create a client for the queue… And all of that boils down to these 3 lines of code
if (!nsm.QueueExists(QueueName))
nsm.CreateQueue(QueueName);
var qClient = mf.CreateQueueClient(QueueName);
Now, I am ready to send messages to the queue. In this contrived example, I set up a loop that retrieves the message to send from console input until the input is an empty string.
string msg;
do
{
Console.Write("Message to send: ");
msg = Console.ReadLine();
if (!string.IsNullOrEmpty(msg))
qClient.Send(new BrokeredMessage(new MyMessage(msg)));
} while (!string.IsNullOrEmpty(msg));
As you can see, if you ignore all of the plumbing, the important stuff is in the qClient.Send() method. The Send() method takes a BrokeredMessage, as mentioned before, but the BrokeredMessage “wraps” my own message. So the service will receive a BrokeredMessage, but it can then get the MyMessage object as well as you will see soon.
The last thing to do is to close the client as soon as the sending is done. This is important as the service bus is charged by connections at the moment. I assume this will change ion the future to be charged per message, but for now it is per connection. So you want to keep the connections as short-lived as possible to keep the connection count down.
Closing the connection is done by calling Close() on the QueueClient.
The service end is pretty much as simple, except for some threading things… I create the service in the same way as the client, by adding a new console application project, adding the previously mentioned references and changing the target framework.
I also copy across the app.config and the config “getters” from my client…
The main difference between the service and the client is that I move the QueueClient out to a global member so that I can reach it from several places in the class. I have also changed the sending loop to a receive thingy… It looks like this
private static QueueClient _qClient;
private static string QueueName = "TheQueue";
static void Main(string[] args)
{
var credentials = TokenProvider.CreateSharedSecretTokenProvider(GetIssuerName(), GetSecret());
var url = ServiceBusEnvironment.CreateServiceUri("sb", GetNamespace(), string.Empty);
var nsm = new NamespaceManager(url, credentials);
var mf = MessagingFactory.Create(url, credentials);
if (!nsm.QueueExists(QueueName))
nsm.CreateQueue(QueueName);
_qClient = mf.CreateQueueClient(QueueName);
BeginReceive();
Console.WriteLine("Waiting for messages...");
Console.ReadKey();
_qClient.Close();
}
As you can see, it is very similar. The sending loop has been replaced with a call to BeginReceive() and a locking call to Console.ReadKey(). The BeginReceive() method is async, so we need to the Console.ReadKey() to keep the application running.
So what does the BeginReceive() method look like? Well, it is literally a one-liner. It looks like this
private static void BeginReceive()
{
_qClient.BeginReceive(TimeSpan.FromMinutes(5), MessageReceived, null);
}
It makes a single call to the BeginReceive() method on the QueueClient. In the call, it tells the QueueClient to wait for up to 5 minutes for the message, to pass the message to the MessageReceived() methods when received, and that it does not need to pass any state object to the MessageReceived() method.
It is in the MessageReceived method that all the important stuff is happening. First of all, the MessageReceived() method is an async thingy, so it needs to take an IAsyncResult parameter.
But besides that, it is pretty straight forward. It uses the QueueClient’s EndReceive() method to get hold of the potentially received message. I say “potentially” as it might not have received a message, and instead timed out, ie not received a message within 5 mins.
Next, I verify whether or not the I did receive a message. if I did, I get the BrokeredMessage’s “body” in the form of a MyMessage instance. I then Console.WriteLine() the message. Finally, I call BegindReceive() again. I also make sure to wrap it all in a try/catch for good measure.
private static void MessageReceived(IAsyncResult iar)
{
try
{
var msg = _qClient.EndReceive(iar);
if (msg != null)
{
var myMsg = msg.GetBody<MyMessage>();
Console.WriteLine("Received Message: " + myMsg.Message);
}
}
catch (Exception ex)
{
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
BeginReceive();
}
That’s all there is to it! This will make sure that the service receives any message sent to the queue. It does so in a non-blocking async way, making sure the UI stays responsive. It also turns the BeginReceive() into a loop, making sure that we go back and ask the queue for a new message continuously until we press a key in the console and thus close the client’s connection.
Code is available as usual here: DarksideCookie.Azure.ServiceBusDemo - Queuing.zip
Cheers, and thanks for listening!