This documentation is for WSO2 Enterprise Integrator version 6.1.0 . View documentation for the latest release in the 6.x.x family and the latest release in the 7.x.x family.

All docs This doc
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

The Message Broker profile in WSO2 Enterprise Integrator (WSO2 EI) supports XA transactions (distributed transactions). This capability ensures that a message delivered to multiple queues in the broker profile can be returned from all queues if at least one queue fails to accept the message.

For example, consider a use case where the Integrator profile in WSO2 EI is configured to distribute messages to three separate queues (mbqueue1, mbqueue2 and mbqueue3) defined in the broker profile. When the message is dispatched from the Integrator profile to the queues, if the message delivery to at least one queue fails, the message should be rolled back without delivering to any one of the queues.

Before you begin:

You can configure the maximum number of distributed transactions that can be handled in parallel by the Message Broker profile of WSO2 EI. To do this, set the following parameter in the broker.xml file (stored in the <EI_HOME>/wso2/broker/repository/conf directory).

<transaction>
    <maxParallelDtxChannels>20</maxParallelDtxChannels>
</transaction>

You can see how this works by following the steps given below.

  1. Follow the instructions on configuring the Integrator profile with the Message Broker profile
  2. Start the Message Broker profile and create three queues: mbqueue1, mbqueue2, and mbqueue3
  3. Start the Integrator profile and add a proxy service to mediate messages to the broker. To handle XA transactions, the proxy service should be configured as shown below. In this example, WSO2 EI listens to a JMS queue named MyJMSQueue and consumes messages and sends messages to multiple JMS queues in a transactional manner.

    <?xml version="1.0" encoding="UTF-8"?>
    <proxy xmlns="http://ws.apache.org/ns/synapse"
           name="JMSListenerProxy"
           startOnLoad="true"
           statistics="disable"
           trace="disable"
           transports="jms,http,https">
       <target>
          <inSequence>
             <property name="OUT_ONLY" value="true"/>
             <log level="custom">
                <property expression="get-property('MessageID')" name="MESSAGE_ID_A"/>
             </log>
             <log level="custom">
                <property expression="$body" name="BEFORE"/>
             </log>
             <property expression="get-property('MessageID')"
                       name="MESSAGE_ID_B"
                       scope="operation"
                       type="STRING"/>
             <property description="FailureResultProperty"
                       name="failureResultProperty"
                       scope="default">
                <result xmlns="">failure</result>
             </property>
             <enrich>
                <source clone="true" xpath="$ctx:failureResultProperty"/>
                <target type="body"/>
             </enrich>
             <log level="custom">
                <property expression="$body" name="AFTER"/>
             </log>
             <property name="BEFORE1" scope="axis2" type="STRING" value="ABCD"/>
             <callout serviceURL="jms:/MBQueue1?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=begin">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <callout serviceURL="jms:/MBQueue2?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <callout serviceURL="jms:/MBQueue3?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=end">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <drop/>
          </inSequence>
          <faultSequence>
             <log level="custom">
                <property name="Transaction Action" value="Rollbacked"/>
             </log>
             <callout serviceURL="jms:/MBQueueDLQ?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=rollback">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
          </faultSequence>
       </target>
       <parameter name="transport.jms.Destination">MyJMSQueue</parameter>
       <parameter name="transport.jms.ContentType">
          <rules xmlns="">
             <jmsProperty>contentType</jmsProperty>
             <default>application/xml</default>
          </rules>
       </parameter>
       <description/>
    </proxy>
  4. Now, you can disable one queue in the Broker profile and send a message to the Integrator. The proxy service will attempt to dispatch the message to all three queues. However, since one queue is unavailable, the message will not be delivered to any of the queues.

  • No labels