Sunday, February 26, 2012

Scalable CXF applications using JMS transport

Java Message Service (JMS) is wide spread and popular messaging API. As far JMS is standardized, the same application code can successfully work with different JMS implementations: WS MQ, Active MQ, Tibco, Joram, BEA WebLogic, OpenJMS. CXF provides a transport that enables endpoints to use JMS queues and topics.

Default CXF consumer and producer using JMS
Implementing CXF client and service using JMS transport is trivial.
Basically, it is enough to configure two things in WSDL:
a) specify jms transport URI in binding element;
b) define jms address in port element


WSDL binding and port should look like:


<wsdl:definitions xmlns:jms="http://cxf.apache.org/transports/jms" ...
    <wsdl:binding name="Greeter_SOAPBinding" type="tns:Greeter">
        <soap:binding style="document"
            transport="http://cxf.apache.org/transports/jms" />
        ...
    </wsdl:binding>
    <wsdl:service name="JMSGreeterService">
        <wsdl:port binding="tns:JMSGreeterPortBinding" name="GreeterPort">
            <jms:address destinationStyle="queue"
                jndiConnectionFactoryName="ConnectionFactory"
                jndiDestinationName="dynamicQueues/test.cxf.jmstransport.queue">
                <jms:JMSNamingProperty name="java.naming.factory.initial"
                    value="org.apache.activemq.jndi.ActiveMQInitialContextFactory" />
                <jms:JMSNamingProperty name="java.naming.provider.url"
                    value="tcp://localhost:61616" />
            </jms:address>
        </wsdl:port>
    </wsdl:service> 

CXF clients and servers implemented in java or using Spring configuration magically work for this WSDL (under the hood CXF selects correct JMS Conduit and Destination based on address URL). Details are described in [REF-3].
CXF also delivers jms_queue and jms_pubsub examples illustrating using of JMS transport with default settings for ActiveMQ [REF-4].


Scalability problems
Unfortunately there are two main scalability drawbacks when using default JMS configuration:


  1. It doesn't provide sessions pooling and consumers/producers cache (*).
  2. Default JMS message consumer is single threaded. It means that only one thread will get messages from the queue or topic and pass them to further processing.
Both aspects are critical for enterprise applications and their implementation is not an easy task. Is there any easy solution? Yes: Spring JMS functionality and CXF Features. Let discuss them in detail.

(*) - Some JMS vendors provide integrated session pooling and consumers/producers cache in ConnectionFactory. In this case using Spring CachingConnectionFactory is not necessary. Please refer vendor documentation to clear it.


Spring JMS functionality
Spring provides a number of useful classes that helps to implement scalable JMS application. Important for us are: org.springframework.jms.connection.CachingConnectionFactory
org.springframework.jms.listener.DefaultMessageListenerContainer

1. CachingConnectionFactory
CachingConnectionFactory provides the possibility to configure session pooling, consumers and producers cache. Bellow is a sample configuration of CachingConnectionFactory:>


<bean id="cachingConnectionFactory"
 class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616" />
        </bean>
    </property>
    <property name="sessionCacheSize" value="20" />
    <property name="cacheProducers" value="true" />
    <property name="cacheConsumers" value="true" />
</bean>

As you can see it is possible to set the size of the session pool and switch on producers and consumers caching.

2. DefaultMessageListenerContainer
DefaultMessageListenerContainer enables getting messages from the destination in parallel, using multiple threads.
Configuration of DefaultMessageListenerContainer looks like:

<bean id="queueContainerListener"
 class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destinationName" value="Q_WM_OUT" />
    <property name="messageListener" ref="simpleListener" />
    <property name="cacheLevel" value="3" />
    <property name="concurrentConsumers" value="10" />
    <property name="maxConcurrentConsumers" value="50" />
</bean> 

It is possible to define here:
  • Initial and maximal number of concurrent consumers. This tells the Spring to always start up a initial number of consumers (concurrentConsumers). When a new message has been received, if the maxConcurrentConsumers has not been reached, then a new consumer is created to process the message.
  • Cache level (3- cache connections, sessions and consumers; 2 – cache connections and sessions, 1 – cache connections only)
  • Specify message listener class (implementing MessageListener interface) and connection factory.
It is important to be aware of following things related to consumers caching:
  • Normally it makes no sense to increase the number of concurrent consumers for a JMS topic. It leads to concurrent consumption of the same message which is not desirable.
  • The concurrentConsumers property and the maxConcurrentConsumers property can be modified at runtime, for example, via JMX.
Details about spring based configuration are very good described in Bruce Snider's Blog [REF-5].
You can see that Spring provides solution for both mentioned scalability aspects. But how we can use it in CXF?

CXF JMS Configuration Feature
As the CXF JMS implementation is based the Spring JMS classes the user can benefit from described Spring JMS functionality.
CXF provides a Feature to configure details of the JMS transport: JmsConfigFeature. A CXF Feature is something that is able to customize a Server, Client, or Bus, typically adding capabilities. In our case we will add a feature in jaxws:endpoint and jaxws:client to tune the JMS transport. Let see how to configure CXF client and service using this feature.

Server configuration

<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616" />
        </bean>
    </property>
    <property name="sessionCacheSize" value="20" />
    <property name="cacheConsumers" value="true" />
</bean>
<bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
    p:connectionFactory-ref="cachingConnectionFactory"

   p:cacheLevel="3"
    p:concurrentConsumers="16" 

    p:maxConcurrentConsumers="16"
    p:targetDestination="Q_HSC"
    p:wrapInSingleConnectionFactory="false" />
<jaxws:endpoint id=" JMSGreeterService" address="jms://"
    implementor="#JMSGreeterServiceImpl">
    <jaxws:features>
        <bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
            <p:jmsConfig-ref = jmsConfig ">
        </bean>
    </jaxws:features>
</jaxws:endpoint>

The jaxws:endpoint configuration containing the JMSConfigFeature. This feature has a property with JMSConfiguration type.
JMSConfiguration property supports all settings that we have seen in Spring DefaultMessageListenerContainer: cached connection factory with session pool size, number of concurrent consumers, cache level. All settings of JMSConfiguration are described in [REF-6].
Using this configuration the server application can be tuned to achieve optimal performance in our target environment. 
 
Client configuration


<bean id="cachingConnectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616" />
        </bean>
    </property>
    <property name="sessionCacheSize" value="20" />
    <property name="cacheProducers" value="true" />
</bean>
<bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
    p:connectionFactory-ref="connectionFactory" p:targetDestination="Q_HSC"
    p:cacheLevel="3" p:concurrentConsumers="16"
    p:maxConcurrentConsumers="16" p:wrapInSingleConnectionFactory="false" />
<jaxws:client id="JMSGreeterService" address="jms://"
    serviceClass="com.sopera.services.tpoc.eventgenerator.EventGenerator">
<jaxws:features>
<bean class="
    org.apache.cxf.transport.jms.JMSConfigFeature ">
    <property name="jmsConfig" ref="jmsConfig" />
</bean>
</jaxws:features>
</jaxws:client>


Client configuration looks very similar to the server one except one thing: CachingConnectionFactory activates producers caching instead consumers caching.

Conclusion
CXF gives a user the possibility to tune the standard transports. To achieve a high scalability of a CXF client and service in JMS communication they can use the CXF JMS Configuration Feature. It is not necessary to write any line of code, just configure and leverage already existing stuff.
Using this feature can have essential influence on the performance for some environments: in one Prove of Concept I have improved CXF service throughput on 360% (from 500 to 1800 msg/sec) just using session pool and multithread JMS consumer!
Reference performance numbers for SOAP over JMS are represented in [REF-7], you can easily compare it with own results and make appropriate tuning if necessary.

References
REF-3: CXF JMS Transport http://cxf.apache.org/docs/jms-transport.html
REF-4: cxf-distribution/examples: jms_queue; jms_pubsub
REF-5: Bruce Snider's blog http://bsnyderblog.blogspot.de/2010/05/tuning-jms-message-consumption-in.html
REF-6: CXF JMS Configuration Feature http://cxf.apache.org/docs/using-the-jmsconfigfeature.html
REF-7: Christian Schneider's blog: CXF performance http://www.liquid-reality.de/pages/viewpage.action?pageId=5865562