Sunday, July 28, 2013

Writing a Node.js Client to Send/Receive messages with WSO2 Messge Broker

WSO2 Message Broker supports AMQP protocol 0-9-1. If you need to use Node.js client to publish/subscribe to Message Broker this can be done with using any compatible Node.js AMQP 0-9-1 Client Library. Some of the examples i found are,
  1. amqp.node : https://github.com/squaremo/amqp.node
  2. node-amqp : https://github.com/postwait/node-amqp
First of all we need to install Node.js if we haven't done it yet. The following page explains how to install Node.js in Ubuntu.

$ sudo apt-get install python-software-properties python g++ make 
$ sudo add-apt-repository ppa:chris-lea/node.js 
$ sudo apt-get update $ sudo apt-get install nodejs

Then add 'amqplib' module to get enable the functions in 'node.amqp' library.

$ npm install amqplib

The following sample code, written using amqp.node library can be used now as a NodeJS client to publish or receive messages from WSO2 Message Broker. You have to use the format amqp://{username}:{password}@{hostname}:{port} to establish a connection with Message Broker. All messages will be sent as byte messages but can be received as text.
'amqp.node' library provide a rich API which can be used to other Queue operations MB too.
  
A Sample Node.js Queue Publisher for WSO2 MB
    
    var queuename = 'MyQueue'; 

    var openConn = require('amqplib').connect('amqp://admin:admin@localhost:5672'); 

    // amqp://{username}:{password}@{hostname}:{port} is default AMQP connection URL of WSO2 MB 

    openConn.then(function(conn) { 

    var ok = conn.createChannel(); 

    ok = ok.then(function(channel) 

    { channel.assertQueue(queuename); 

    channel.sendToQueue(queuename, new Buffer('New Message')); }); 

    return ok; 

    }).then(null, console.warn);  

A Sample Node.js Queue Consumer for WSO2 MB
var queuename = 'MyQueue'; 

    var openConn = require('amqplib').connect('amqp://admin:admin@localhost:5672'); 

    // amqp://{username}:{password}@{hostname}:{port} is default AMQP connection URL of WSO2 MB 

    openConn.then(function(conn) { 

    var ok = conn.createChannel(); 

    ok = ok.then(function(channel) { 

           channel.assertQueue(queuename); 

    channel.consume(queuename, function(msg) { 

    console.log(msg.content.toString()); 

    channel.ack(msg); }); 

    }); 

    return ok; 

    }).then(null, console.warn);

Thursday, July 25, 2013

Providing Dynamic Queue Support when Intergating WSO2 Message Broker with Other Products


WSO2 Message Broker can be integrated with many other WSO2 products like WSO2 Enterprise Service Bus, Application Server and Data Services Server etc. via JMS transaport. Previously, when starting the integrated product (say WSO2 Application Server) after connecting with the Message Broker, the following exception might happen continuously.

ERROR {org.apache.axis2.transport.base.threads.NativeWorkerPool} - Uncaught exception
java.lang.UnsupportedOperationException: The new addressing based sytanx is not supported for AMQP 0-8/0-9 versions
at org.wso2.andes.client.AMQSession_0_8.handleAddressBasedDestination(AMQSession_0_8.java:572)
at org.wso2.andes.client.AMQSession.registerConsumer(AMQSession.java:2838)
at org.wso2.andes.client.AMQSession.access$500(AMQSession.java:117)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:2031)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:1997)
at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:305)
at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:621)
at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102)
at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:1995)
at org.wso2.andes.client.AMQSession.createConsumer(AMQSession.java:993)
at org.apache.axis2.transport.jms.JMSUtils.createConsumer(JMSUtils.java:642)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.createConsumer(ServiceTaskManager.java:871)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.getMessageConsumer(ServiceTaskManager.java:741)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.receiveMessage(ServiceTaskManager.java:498)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:420)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

The reason for this annoying error log is, when WSO2 MB is integrated with some other product, at the server start up it tries to create a Queue in MB for each deployed service in other product (WSO2 AS in this case), with the corresponding service name. However since there was no dynamic queue support previously, it refers the 'jndi.properties' file in <Carbon_Home>/Repository/conf folder and see whether a queue with given service name is defined. When it is not defined there, the server will be throwing following exception.

With the current dynamic queue support there is no need to define the queue/topic names as it will be not limited to a 'jndi.properties' file and all the queues for available services will be created on the fly. This feature is supported via Axis2-JMS-Transport and WSO2 ESB 4.6.0, AS 5.1.0 and DSS 3.0.1 uses axis2-transport-jms_1.1.0.wso2v7 version. Since the jms transport version, axis2-transport-jms_1.1.0.wso2v8 which facilitates dynamic queues has not been released yet, to have these changes you need to replace the axis2-transport-jms library in <Carbon_Home>/Repository/components/plugins with axis2-transport-jms_1.1.0.wso2v7.jar file added in this location. However the new versions of WSO2 products will be shipped with axis2-transport-jms_1.1.0.wso2v8 version and there afterwards there is no need to do the above replacement.

The following are the new versions which will facilitate dynamic queue support with WSO2 MB by default.

Note: According to the Message Broker roadmap, 'Dynamic Queue Support' will be officially provided with MB 3.0.0 version which will be released in early next year. Therefore this is not the final verified implementation of this feature, however 

WSO2 ESB 4.7.0 : Already Released, You can go to this doc to see the integration guide.

WSO2 AS 5.2.0 : Will be released soon, find the integration guide for AS 5.1.0 here.

WSO2 DSS 3.0.2 : Will be released soon, find the integration guide for DSS 3.0.1 here.

Saturday, July 20, 2013

Avoiding javax.management.InstanceAlreadyExistsException when starting WSO2 Message Broker in Clustering Mode

WSO2 Message Broker can be set up in a clustered environment according to 5 different patterns. The broker nodes coordination happens through Apache ZooKeeper and when in clustered mode and we can configure WSO2 MB to use either an external ZooKeeper server or the built-in ZooKeeper server which is shipped with MB by default.

When starting a broker node in a cluster with using built-in ZooKeeper server as described in this pattern you might be facing with the following error sometimes.

TID: [0] [MB] [2013-07-17 20:43:55,894]  INFO {org.wso2.carbon.coordination.
server.CoordinationServer} -  Starting Coordination server in clustered mode... {org.wso2.carbon.coordination.server.CoordinationServer}
TID: [0] [MB] [2013-07-17 20:43:55,916] ERROR {org.apache.log4j.jmx.
AppenderDynamicMBean} -  Could not add DynamicLayoutMBean for [CARBON_LOGFILE,layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout]. {org.apache.log4j.jmx.AppenderDynamicMBean}
javax.management.
InstanceAlreadyExistsException: log4j:appender=CARBON_LOGFILE,layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout
    at com.sun.jmx.mbeanserver.
Repository.addMBean(Repository.java:453)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.internal_addObject(DefaultMBeanServerInterceptor.java:1484)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:963)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
AppenderDynamicMBean.registerLayoutMBean(Unknown Source)
    at org.apache.log4j.jmx.
AppenderDynamicMBean.preRegister(Unknown Source)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.preRegisterInvoke(DefaultMBeanServerInterceptor.java:1010)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:938)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.registerAppenderMBean(Unknown Source)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.appenderMBeanRegistration(Unknown Source)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.postRegister(Unknown Source)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.postRegisterInvoke(DefaultMBeanServerInterceptor.java:1035)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:974)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
HierarchyDynamicMBean.addLoggerMBean(Unknown Source)
    at org.apache.log4j.jmx.
HierarchyDynamicMBean.addLoggerMBean(Unknown Source)
    at org.apache.zookeeper.jmx.
ManagedUtil.registerLog4jMBeans(ManagedUtil.java:67)
    at org.apache.zookeeper.server.
quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:122)
    at org.wso2.carbon.coordination.
server.CoordinationServer.run(CoordinationServer.java:78)
TID: [0] [MB] [2013-07-17 20:43:55,916] ERROR {org.apache.log4j.jmx.
AppenderDynamicMBean} -  Could not add DynamicLayoutMBean for [CARBON_LOGFILE,layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout]. {org.apache.log4j.jmx.AppenderDynamicMBean}
javax.management.
InstanceAlreadyExistsException: log4j:appender=CARBON_LOGFILE,layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout
    at com.sun.jmx.mbeanserver.
Repository.addMBean(Repository.java:453)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.internal_addObject(DefaultMBeanServerInterceptor.java:1484)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:963)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
AppenderDynamicMBean.registerLayoutMBean(Unknown Source)
    at org.apache.log4j.jmx.
AppenderDynamicMBean.preRegister(Unknown Source)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.preRegisterInvoke(DefaultMBeanServerInterceptor.java:1010)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:938)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.registerAppenderMBean(Unknown Source)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.appenderMBeanRegistration(Unknown Source)
    at org.apache.log4j.jmx.
LoggerDynamicMBean.postRegister(Unknown Source)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.postRegisterInvoke(DefaultMBeanServerInterceptor.java:1035)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:974)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:917)
    at com.sun.jmx.interceptor.
DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:312)
    at com.sun.jmx.mbeanserver.
JmxMBeanServer.registerMBean(JmxMBeanServer.java:483)
    at org.apache.log4j.jmx.
HierarchyDynamicMBean.addLoggerMBean(Unknown Source)
    at org.apache.log4j.jmx.
HierarchyDynamicMBean.addLoggerMBean(Unknown Source)
    at org.apache.zookeeper.jmx.
ManagedUtil.registerLog4jMBeans(ManagedUtil.java:67)
    at org.apache.zookeeper.server.
quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:122)
    at org.wso2.carbon.coordination.
server.CoordinationServer.run(CoordinationServer.java:78)

This doesn't break any functionality of Message Broker however it would be annoying to see a long error log when the server starts up. The reason for this happening is Apache ZooKeeper enables JMX by default during the ZooKeeper server start up. With that it also tries to register new Log4j MBeans to manage log4j through JMX which causes the error.

To overcome this and start the server normally we can simply disable Log4j MBean registration of Zookeeper at the Message Broker start up by starting MB with using following command.

sh wso2server.sh -Dzookeeper.jmx.log4j.disable=true

This will pass the message to JVM that disable Log4j MBeans when starting ZooKeeper. 
You can learn more on all the five different clustering patterns of WSO2 MB by visiting the Message Broker clustering documentation here.

Monday, July 15, 2013

Creating an OSGi Bundle out of a Third Party Library for Using as WSO2 Product Dependency

WSO2 platform supports a modular based architecture where third party libraries which are used in WSO2 products are exposed into the environment as OSGi bundles. This blog post is about how to OSGify a third party library, to be used as a dependency inside WSO2 MB.

We have recently used 'Disruptor', a high performance inter-thread messaging library  as a dependency in WSO2 Message Broker for improving the performance by using disruptor based message writing operations into Cassandra storage. When doing this it is first needed to create an OSGi bundle out of 'Disruptor' library, so it can be referred in the runtime when Message Broker is running.

As common to all OSGi bundle generations, first you need to have a manifest.mf file which describes the bundle information, version info, which packages needed to be imported/exported etc. However as it is not easy to write this file correctly by hand we use maven-bundle-plugin in order to get this done. This process is common to any third party dependency and you can follow the same process in building an osgi bundle for any of them.

1. First let's create a directory named 'disruptor' inside <WSO2_Carbon_Source>/platform/dependencies/orbit/ directory. As we have used Disruptor 2.10.4 version i created a new package called 2.10.4-wso2v1 inside disruptor directory. (This is the common notation across the platform <LibraryVersion>-wso2v<VersionNumber>when naming the versions of the dependencies.)

2. Add a new maven build file (pom.xml) inside package 2.10.4-wso2v1.

3. Now let's add the required details which needs to generate the OSGi bundle into this file. This pom file can be used as a sample template by replacing the required entries when adding another dependency.

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ">http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
   <groupId>org.wso2.carbon</groupId>
   <artifactId>carbon-dependents</artifactId>
   <version>4.1.0</version>
   <relativePath>../../../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.googlecode.disruptor.wso2</groupId>
<artifactId>disruptor</artifactId>
<packaging>bundle</packaging>
<name>WSO2 Carbon - Orbit - disruptor</name>
<version>2.10.4-wso2v1</version>
<description>This bundle exports packages from disruptor jar files</description>
<url>http://wso2.org</url>

<dependencies>
   <dependency>
      <groupId>com.googlecode.disruptor</groupId>
      <artifactId>disruptor</artifactId>
      <version>2.10.4</version>
      <optional>true</optional>
   </dependency>
</dependencies>

<build>
     <plugins>
            <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
                 <version>1.4.0</version>
                 <extensions>true</extensions>
                 <configuration>
                      <instructions>
                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
                        <Bundle-Name>${project.artifactId}</Bundle-Name>
                        <Export-Package>
                                com.lmax.disruptor.*;version=2.10.4,
                         </Export-Package>
                        <Import-Package>
                                !sun.misc,
                                *
                        </Import-Package>
                      </instructions>
                </configuration>
            </plugin>
     </plugins>
</build>

<!--<repositories>
    <repository>
       <snapshots>
          <enabled>true</enabled>
          <updatePolicy>daily</updatePolicy>
          <checksumPolicy>ignore</checksumPolicy>
    </snapshots>
    <id>wso2-maven2-snapshot-repository</id>
    <name>WSO2 Maven2 Snapshot Repository</name>
    <url>http://dist.wso2.org/snapshots/maven2/</url>
    <layout>default</layout>
  </repository>
</repositories>-->

<properties>
     <disruptor.build.version>2.10.4</disruptor.build.version>
     <disruptor.version>${disruptor.build.version}-wso2v1</disruptor.version>
 <disruptor.orbit.version>${disruptor.build.version}.wso2v1</disruptor.orbit.version>
</properties>
</project>


Let's go trough the important entries in the file.

<dependencies>
<dependency>
<groupId>com.googlecode.disruptor</groupId>
<artifactId>disruptor</artifactId>
<version>2.10.4</version>
<optional>true</optional>
</dependency>
</dependencies>


As we are using external library in the OSGi bundle we need to add this as a maven dependency entry for the bundle. You can find maven dependency entries of lot of libraries in here.

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>1.4.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Export-Package>
             com.lmax.disruptor.*;version=2.10.4,
</Export-Package>
<Import-Package>
             !sun.misc,
             *
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>


This is how we use maven-bundle-plugin to generate the disruptor osgi bundle. The Export-Package, Import-Package and DynamicImport-Package tags are used to control the exposure of certain packages to other bundles. With <Export-Package> tag it will find the defined project classes or dependencies, and they will be copied to the osgi bundle. With using <Import-Package> here, we can import the extra classes that have been referred from our osgi bundle. By keeping * as the value here by default it imports all the reference classes that needs for OSGi bundle generation. However when using default value here for 'disruptor bundle' i encountered the following error.

org.osgi.framework.BundleException: The bundle could not be resolved. Reason: Missing Constraint: Import-Package: sun.misc; version="0.0.0"

So I added the entry '!sun.misc' to resolve this issue and it seems as the sun.misc package is already there to be referred by the osgi framework, we need to tell that it is not needed to explicitly import it in this case. This is a 'Disruptor' bundle specific entry and you don't have to add the same configuration in each bundle you create. However if the bundle's classes use sun.misc package, this setting might be useful in resolving similar errors.


4. That's it! Save the pom file and build it with maven ( WSO2 projects use Maven3). You will see that in the <WSO2_Carbon_Source>/platform/dependencies/orbit/disruptor/2.10.4-wso2v1/target folder the newly created osgi bundle 'disruptor-2.10.4-wso2v1.jar' is present.

If you see the manifest.mf file of the new bundle it will be as follows.

Manifest-Version: 1.0
Export-Package: com.lmax.disruptor.collections;version="2.10.4",com.lm
ax.disruptor;uses:="com.lmax.disruptor.util";version="2.10.4",com.lma
x.disruptor.dsl;uses:="com.lmax.disruptor,com.lmax.disruptor.util";ve
rsion="2.10.4",com.lmax.disruptor.util;uses:="com.lmax.disruptor";ver
sion="2.10.4"
Ignore-Package: sun.misc
Built-By: <your_host_name>
Tool: Bnd-0.0.238
Bundle-Name: disruptor
Created-By: Apache Maven Bundle Plugin
Bundle-Version: 2.10.4.wso2v1
Build-Jdk: 1.6.0_30
Bnd-LastModified: 1373904351577
Bundle-ManifestVersion: 2
Bundle-Description: This bundle exports packages from disruptor jar fi
les
Bundle-SymbolicName: disruptor
Import-Package: com.lmax.disruptor;version="2.10.4",com.lmax.disruptor
.collections;version="2.10.4",com.lmax.disruptor.dsl;version="2.10.4"
,com.lmax.disruptor.util;version="2.10.4"



To know more about creating OSGi bundles with maven-bundle-plugin, you can read the following article in WSO2 Library.


[1]. http://wso2.com/library/tutorials/develop-osgi-bundles-using-maven-bundle-plugin