As WSO2 Message Broker supports AMQP (Advanced Message Queuing Protocol) v 0.91 standard, this can be used to provide interoperability between different platforms via AMQP clients written on different languages. This blog post is about how to connect with WSO2 MB 2.1.0 using a C# .Net client and perform Queue/Topic based operations on Message Broker.
In order to connect with MB first we need to find a C# .Net client library which supports AMQP standards. I have used RabbitMQ .Net client library here for this purpose.
You can still define and create new exchanges if needed, however they need to be first added into <MB_Home>/Repository/conf/advanced/qpid-virtualhosts.xml file under the <exchanges> section as the example below. However it is usually recommended to use the default direct/topic exchanges.
Now let's get back to writing our sample code.
I. The following is a sample client code to publish into a Topic in the broker. The code itself includes self-explanatory comments which describe its functions.
II. Then let's write the sample client code to consume messages from a Topic in the broker.
To run the clients first run the sample Topic Consumer where it will create a new topic subscription for the 'myTopic'. After the when you run the sample Topic Publisher client, the consumer will receive the message published and will display it in the console as "Test Message".
The complete source code along with the RabbitMQ.Client.dll library can be downloaded from here too.
If you need to learn on how to use the API to customize the sample Publisher and Consumer before using it with WSO2 Message Broker, refer the guides given below.
In order to connect with MB first we need to find a C# .Net client library which supports AMQP standards. I have used RabbitMQ .Net client library here for this purpose.
1. How to Publish/Subscribe to Queues via .Net client
The following tutorial in WSO2 Library provides a complete guide on how to perform Queue operations in by connecting to WSO2 MB via the .Net client. Therefore i am not going to repeat it here and let's see how to use manipulate Topics with a C# .Net client.
2. How to Publish/Subscribe to Topics via .Net client
The main difference between connecting to a Topic and a Queue is, by default in WSO2 MB they use two different exchanges in order to connect with Queue and Topics. Therefore if you need to connect to a Queue/Topic, first it needs to be bound to the 'Exchange' in the broker by specifying the exchange name. There are two default exchanges used in WSO2 Message Broker.
- 'amq.direct' : The pre-defined direct exchange, used with Queues
- 'amq.topic' : The pre-defined topic exchange, used with Topics
You can still define and create new exchanges if needed, however they need to be first added into <MB_Home>/Repository/conf/advanced/qpid-virtualhosts.xml file under the <exchanges> section as the example below. However it is usually recommended to use the default direct/topic exchanges.
direct carbon.direct true topic carbon.topic
Now let's get back to writing our sample code.
I. The following is a sample client code to publish into a Topic in the broker. The code itself includes self-explanatory comments which describe its functions.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_Topic_Publisher { class TopicPublisher { static void Main(string[] args) { TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.PublishMessage("Test Message"); Console.WriteLine("Message Sent.."); Console.ReadLine(); } public void PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_8_QPID; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); // Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'. // The syntax is ch.BasicPublish(exchange_name, topic_name, message_properties,message_body) ch.BasicPublish("amq.topic", "test-topic", null, Encoding.UTF8.GetBytes(message)); } } } } }
II. Then let's write the sample client code to consume messages from a Topic in the broker.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_TopicClient { class TopicConsumer { static void Main(string[] args) { TopicConsumer topicConsumer = new TopicConsumer(); topicConsumer.getMessages(); } public void getMessages() { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_8_QPID; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); // Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true' ch.QueueDeclare("test-topic", false, false, false, null); // Bind the Topic in to the exchange ch.QueueBind("test-topic", "amq.topic", "test-topic"); // Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work. // The syntax is BasicConsume(queuename, noAck,consumerTag, noLocal, exclusive, arguments, Consumer) QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicConsume("test-topic", false, "1", false, true, null, consumer); while (true) { try { RabbitMQ.Client.Events.BasicDeliverEventArgs e =(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] body = e.Body; string message = Encoding.UTF8.GetString(body); Console.WriteLine(message); ch.BasicAck(e.DeliveryTag, false); } catch (OperationCanceledException e) { Console.WriteLine(e); break; } } } } } } }
3. Download and Running the sample
To run the clients first run the sample Topic Consumer where it will create a new topic subscription for the 'myTopic'. After the when you run the sample Topic Publisher client, the consumer will receive the message published and will display it in the console as "Test Message".
The complete source code along with the RabbitMQ.Client.dll library can be downloaded from here too.
4. A guide to RabbitMQ .Net library API
If you need to learn on how to use the API to customize the sample Publisher and Consumer before using it with WSO2 Message Broker, refer the guides given below.
- RabbitMQ .Net client v2.8.4 API Guide : find here
- RabbitMQ .Net client v2.8.4 User Guide : find here