The Message Broker profile in WSO2 Enterprise Integrator (WSO2 EI) supports XA transactions (distributed transactions). This capability ensures that a message delivered to multiple queues in the broker profile can be returned from all queues if at least one queue fails to accept the message.
For example, consider a use case where the Integrator profile in WSO2 EI is configured to distribute messages to three separate queues (mbqueue1, mbqueue2 and mbqueue3) defined in the broker profile. When the message is dispatched from the Integrator profile to the queues, if the message delivery to at least one queue fails, the message should be rolled back without delivering to any one of the queues.
You can see how this works by following the steps given below.
- Follow the instructions on configuring Integrator profile with the Message Broker profile.
- Start the Message Broker profile and create three queues: mbqueue1, mbqueue2, and mbqueue3
Start the Integrator profile and add a proxy service to mediate messages to the broker. To handle XA transactions, the proxy service should be configured as shown below. In this example, WSO2 EI listens to a JMS queue named MyJMSQueue and consumes messages and sends messages to multiple JMS queues in a transactional manner.
<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="JMSListenerProxy" startOnLoad="true" statistics="disable" trace="disable" transports="jms,http,https"> <target> <inSequence> <property name="OUT_ONLY" value="true"/> <log level="custom"> <property expression="get-property('MessageID')" name="MESSAGE_ID_A"/> </log> <log level="custom"> <property expression="$body" name="BEFORE"/> </log> <property expression="get-property('MessageID')" name="MESSAGE_ID_B" scope="operation" type="STRING"/> <property description="FailureResultProperty" name="failureResultProperty" scope="default"> <result xmlns="">failure</result> </property> <enrich> <source clone="true" xpath="$ctx:failureResultProperty"/> <target type="body"/> </enrich> <log level="custom"> <property expression="$body" name="AFTER"/> </log> <property name="BEFORE1" scope="axis2" type="STRING" value="ABCD"/> <callout serviceURL="jms:/MBQueue1?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=conf/jndi.properties&transport.jms.DestinationType=queue&transport.jms.TransactionCommand=begin"> <source type="envelope"/> <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/" xmlns:s12="http://www.w3.org/2003/05/soap-envelope" xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/> </callout> <callout serviceURL="jms:/MBQueue2?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=conf/jndi.properties&transport.jms.DestinationType=queue"> <source type="envelope"/> <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/" xmlns:s12="http://www.w3.org/2003/05/soap-envelope" xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/> </callout> <callout serviceURL="jms:/MBQueue3?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=conf/jndi.properties&transport.jms.DestinationType=queue&transport.jms.TransactionCommand=end"> <source type="envelope"/> <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/" xmlns:s12="http://www.w3.org/2003/05/soap-envelope" xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/> </callout> <drop/> </inSequence> <faultSequence> <log level="custom"> <property name="Transaction Action" value="Rollbacked"/> </log> <callout serviceURL="jms:/MBQueueDLQ?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=conf/jndi.properties&transport.jms.DestinationType=queue&transport.jms.TransactionCommand=rollback"> <source type="envelope"/> <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/" xmlns:s12="http://www.w3.org/2003/05/soap-envelope" xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/> </callout> </faultSequence> </target> <parameter name="transport.jms.Destination">MyJMSQueue</parameter> <parameter name="transport.jms.ContentType"> <rules xmlns=""> <jmsProperty>contentType</jmsProperty> <default>application/xml</default> </rules> </parameter> <description/> </proxy>
Now, you can disable one queue in the Broker profile and send a message to the Integrator. The proxy service will attempt to dispatch the message to all three queues. However, since one queue is unavailable, the message will not be delivered to any of the queues.