I guess it is time for another Azure Service Bus post. The previous ones has been relatively popular, so I thought I would do one more post to cover one last feature in the bus. (I say one last now, but I am pretty sure I will be back…)
Topics and subscribers are the basic units behind the Service Bus implementation of the pub/sub pattern. And as expected from a “simple” pattern like this, it should be simple to implement, and it is. The basics would be, create a topic, add subscribers that subscribe to messages from the topic, and finally push some messages to the topic, which are then relayed to the subscribers. Simple as…
Ok, so let’s look at some code already…and yes we will, but first I just want to mention that this post builds upon some of the previous things. It for example assumes that there is a Service Bus namespace up and rolling and so on. All information about getting that set up is available here. It also contains the information about getting the required SDK installed and so on. So if you haven’t read my post about message relaying, do it.
With that information out of the way, let’s start coding. I start off by creating a class library project, which I call Messages. In this project, I will place the messages that will be sent and received from the topic.
Just remember, like previously, we have to change the target framework to .NET 4. As soon as that is out of the way, I create 2 new classes called “MyFirstMessage” and “MySecondMessage”.
I know that is lacking a little bit of imagination, but that will have to do…
The messages are simple classes, and actually have the same single string property called Message. So they look like this
public class MyFirstMessage
{
public MyFirstMessage() {}
public MyFirstMessage(string msg)
{
Message = msg;
}
public string Message { get; set; }
}
public class MySecondMessage
{
public MySecondMessage() {}
public MySecondMessage(string msg)
{
Message = msg;
}
public string Message { get; set; }
}
Yes, they are pretty much identical. But they will do for this simple demo. The important part is to have 2 messages.
Now that we have the messages it is time to create a publisher, and in good old style, I am creating a new Console application. I change the target framework and add a reference to the Microsoft.ServiceBus assembly. And just as in previous samples, I add in an app.config file and place the namespace, issuer and secret in the appsettings.
As soon as that is done, I can start messing with the actual program. I start by adding in the configuration properties and setting up a new NamespaceManager and MessagingFactory, making the application more or less a copy of the one used in the previous post about queueing. The only difference so far, is that I also create a string constant to hold the name of the Topic to interact with. It looks like this
class Program
{
private const string Topic = "TheTopic";
static void Main(string[] args)
{
var url = ServiceBusEnvironment.CreateServiceUri("sb", GetNamespace(), string.Empty);
var credentials = TokenProvider.CreateSharedSecretTokenProvider(GetIssuerName(), GetSecret());
var nsc = new NamespaceManager(url, credentials);
var mf = MessagingFactory.Create(url, credentials);
...
}
private static string GetIssuerName()
{
return ConfigurationManager.AppSettings["issuer"];
}
private static string GetSecret()
{
return ConfigurationManager.AppSettings["secret"];
}
private static string GetNamespace()
{
return ConfigurationManager.AppSettings["namespace"];
}
}
Ok, so so far, there is nothing new really, and what comes next might be new, but still looks very familiar. I start by using the NamespaceManager to see if there is a Topic with the specified name. If there isn’t I create one. After that is done, I create a TopicClient that will be responsible for communicating with the topic.
As soon as I get my hands on that TopicClient, I start a for-loop that sends 10 messages to the Topic. Five of type MyFirstMessage, and 5 of type MySecondMessage. But before sending the messages, I use another feature of the BrokeredMessage class, the ability to add metadata to the message. This is done by putting the data into an IDictionary<string,object> called Properties. This metadata is then passed along with the message to the client, adding some extra features that you will see later.
For now, all you need to know is that together with the message, I am passing the full name of the message type as a metadata property called “Type”. I also make sure to replace any “.” in the full name with “_” as it fails if there are “.” characters in the data.
The loop looks like this
for (int i = 0; i < 11; i++)
{
object msg;
msg = i % 2 == 0 ? (object)new MyFirstMessage("First message " + i) : (object)new MySecondMessage("Second message " + i);
var brokeredMessage = new BrokeredMessage(msg);
brokeredMessage.Properties["Type"] = msg.GetType().FullName.Replace('.','_');
Console.Write("Sending message " + i + "...");
client.Send(brokeredMessage);
Console.WriteLine("Done!");
}
And finally I close the client and add a Console.ReadKey() to keep the window open. This is really not necessary, but it feels wrong to have the window just close…
That was the publisher! Recap, create a Topic, then a TopicClient, then a BrokeredMessage, and finally send it to the message to the Topic.
Next up is a subscriber. Once again, it is a Console application, with changed framework and a new reference. And once again, I add the app.cofig and the config retrieving properties. I also add the string constant that I used in the publisher.
I start off the same way as in the publisher, by creating the NamespaceManager and MessagingFactory and making sure that the Topic is there. Normally, the Topic should be created before the subscriber arrives. But to make it foolproof in this case, I add the creation logic in here as well.
But that’s where the commonality changes. As soon as I know the Topic exists, I carry on with the Subscription. I once again use the NamespaceManager to check if there is a Subscription with a name that I have defined, in this case “Subscriber”. If there isn’t, I create one.
So the way that Topics and Subscribers work, is that you have a Topic that messages are sent through. The clients listening connect to a so called Subscription, which is pretty natural. The important thing to understand though, is that each Subscription has a name, and only one client can listen to it. If you connect more than one client to the same Subscription, you will get a race and only one client will receive the messages.
The way it works, at least conceptually, is that each Subscription has a queue. When you send a message to the Topic, it puts that message into all registered Subscription’s queues. And as you connect a client to a Subscription, it looks for messages in that queue. (It might be a little different in the actual implementation, I don’t know, but it basically works like that at least…)
Ok, back to the code. As soon as the Subscription’s existence has been confirmed I use the MessagingFactory to create a new SubscriptionClient. To create one of these, it takes the name of the Topic, and the name of the Subscription.
I then start a loop that receives messages asynchronously. And since it is async, I use a Console.ReadKey() call to keep the application running. After the ReadKey() call, I close the client, and use the NamespaceManager to delete the Subscription. Like this
static void Main(string[] args)
{
var url = ServiceBusEnvironment.CreateServiceUri("sb", GetNamespace(), string.Empty);
var credentials = TokenProvider.CreateSharedSecretTokenProvider(GetIssuerName(), GetSecret());
var nsc = new NamespaceManager(url, credentials);
var mf = MessagingFactory.Create(url, credentials);
if (!nsc.TopicExists(Topic))
nsc.CreateTopic(Topic);
var name = "Subscriber";
if (!nsc.SubscriptionExists(Topic, name))
nsc.CreateSubscription(Topic, name);
_client = mf.CreateSubscriptionClient(Topic, name);
BeginReceive();
Console.WriteLine("Waiting for messages...");
Console.ReadKey();
_client.Close();
nsc.DeleteSubscription(Topic, name);
}
As each Subscription is an “entity” in the bus, removing it when you are done is a good idea… Messages will not go into the Subscription and stay there forever, but they will be there for a while and use up space. I also assume that having Subscriptions around, will also slow down the message sending a tiny amount… Anyhow, in this case it makes sense to remove it at least…
Ok, so that was the main part of the subscriber. The part that wasn’t in that code was the message receive loop that was started by the call to BeginReceive(). The BeginReceive() method is very simple. It checks to make sure that the client isn’t closed, and then calls BeginReceive() on it. The BeginReceive() method takes a TimeSpan that defines how long it will wait for a message, a callback method for when a message has been received, or the time has run out, and finally a user state object if needed.
private static void BeginReceive()
{
if (!_client.IsClosed)
_client.BeginReceive(TimeSpan.FromMinutes(5), MessageReceived, null);
}
The actual work is in the MessageReceived() method. It takes an IAsyncResult like all async callbacks. This is then used when calling the EndReceive() method on the client. If something has gone wrong, an exception might be thrown, so I wrap it in a try/catch and Concole.WriteLine the exception.
If everything goes ok, I get a message back, or potentially nothing. If a timeout is the reason for the ending of the call, the EndReceive() will return null. So I start by checking whether or not I got a message.
If I did, I check the type by looking at the metadata property called “Type”. Remember, the one I added above… I make sure to replace my “_” with “.” and then do an if-statement to handle different message types that were sent.
If the type is one that I recognize, I get the message from the BrokeredMessage by calling GetBody<T>(), and out put the message to the console. If it is a type I don’t recognize, I output that to the console.
And finally, at the end of the method, I call BeginReceive() again to receive more messages.
private static void MessageReceived(IAsyncResult iar)
{
try
{
var msg = _client.EndReceive(iar);
if (msg != null)
{
var type = (string)msg.Properties["Type"];
if (!string.IsNullOrEmpty(type))
type = type.Replace('_', '.');
if (type == typeof(MyFirstMessage).FullName)
{
var myMsg = msg.GetBody<MyFirstMessage>();
Console.WriteLine("Received a MyFirstMessage: " + myMsg.Message);
}
else if (type == typeof(MySecondMessage).FullName)
{
var myMsg = msg.GetBody<MySecondMessage>();
Console.WriteLine("Received a MySecondMessage: " + myMsg.Message);
}
else
{
Console.WriteLine("Received a message I don't understand...");
}
}
}
catch (Exception ex)
{
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
BeginReceive();
}
Ok, that works fine! But there has to be a better way to handle the message type…and there is. And my second subscriber will use it. The second subscriber is identical to the first one, except for 2 things. First of all, if obviously uses another name for its Subscription, and secondly, it only cares about messages of type MyFirstMessage.
I actually create the second subscriber by creating anew Console project as usual, and then copying across the Main() method and the app.config.
So how do I handle this in a good way. Well, when you create a Subscription, you can pass along a filter. This filter, makes it easy to filter messages based on their metadata using a T-SQL-like language.
The filter class is called SqlFilter, and is very simple to add like this
if (!nsc.SubscriptionExists(Topic, name))
{
var filter = new SqlFilter("Type = '" + typeof(MyFirstMessage).FullName.Replace('.', '_') + "'");
nsc.CreateSubscription(Topic, name, filter);
}
This filter will make sure that the Subscription only gets messages where the metadata property “Type” has the correct value.
You can add as many filters as you want to a Subscription. However, they are calculated using “OR”. So if any of the filters match, the message will be received.
After that little change in the Subscription creation, I also change the MessageReceived() method to just accept MyFirstMessage messages.
private static void MessageReceived(IAsyncResult iar)
{
try
{
var msg = _client.EndReceive(iar);
if (msg != null)
{
var type = (string)msg.Properties["Type"];
if (!string.IsNullOrEmpty(type))
type = type.Replace('_', '.');
if (type == typeof(MyFirstMessage).FullName)
{
var myMsg = msg.GetBody<MyFirstMessage>();
Console.WriteLine("Received a MyFirstMessage: " + myMsg.Message);
}
else
{
Console.WriteLine("Received a message I don't understand...");
}
}
}
catch (Exception ex)
{
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
BeginReceive();
}
I still have the else statement in there, just as a failsafe, but it will never be called…
That is it! If you start off by starting the subscribers one at the time, and then the publisher, you should see the messages flowing through.
You can also start the publisher first, and then the subscribers. They will still get the messages when they start up.
The only thing is that if you start them at exactly the same time, you might get an exception because two of the apps are both trying to create the Topic at once…
That’s it for this time!
The code is available as usual: DarksideCookie.Azure.ServiceBusDemo.zip
Cheers!