This documentation is for WSO2 Enterprise Integrator version 6.2.0 . View documentation for the latest release in the 6.x.x family and the latest release in the 7.x.x family.

All docs This doc
Skip to end of metadata
Go to start of metadata

In JMS 2.0, it is mandatory for JMS providers to set the JMSXDeliveryCount property, which allows an application that receive a message to determine how many times the message is redelivered.

If a message is being redelivered, it means that a previous attempt to deliver the message failed due to some reason. If a message is being redelivered multiple times, it can be because the message is bad in some way. When such a message is being redelivered over and over again, it wastes resources and prevents subsequent good messages from being processed.

When you work with the ESB Profile of WSO2 Enterprise Integrator (WSO2 EI), you can detect such repeatedly redelivered messages using the  JMSXDeliveryCount property that is set in messages.

Being able to detect repeatedly redelivered messages is particularly useful because you can take necessary steps to handle such messages in a proper manner. For example, you can consume such a message and send it to a separate queue.

The following diagram illustrates how the ESB Profile of WSO2 EI can be used to detect repeatedly redelivered messages, and store such messages in an internal message store. 

To demonstrate the scenario illustrated above, let's configure the JMS inbound endpoint in the ESB Profile of WSO2 EI using HornetQ as the message broker. This sample scenario includes the following sections:

Prerequisites

Configuring and starting the HornetQ message broker

  1. Open the <HornetQ_HOME>/config/stand-alone/non-clustered/hornetq-jms.xml file in a text editor and add the following configuration:

    <connection-factory name="QueueConnectionFactory">
          <xa>false</xa>
          <connectors>
             <connector-ref connector-name="netty"/>
          </connectors>
          <entries>
             <entry name="/QueueConnectionFactory"/>
          </entries>
    </connection-factory>
     
    <connection-factory name="TopicConnectionFactory">
          <xa>false</xa>
          <connectors>
             <connector-ref connector-name="netty"/>
          </connectors>
          <entries>
             <entry name="/TopicConnectionFactory"/>
          </entries>
    </connection-factory>
    
    <queue name="wso2">
          <entry name="/queue/mySampleQueue"/>
    </queue>
    
    <topic name="sampleTopic">
          <entry name="/topic/exampleTopic"/>
    </topic>
  2. Start the HornetQ message broker
    • To start the message broker on Linux, navigate to the <HornetQ_HOME>/bin/ directory and execute the run.sh command with root privileges.

Building the sample scenario

The XML configuration for this sample scenario is as follows:

<definitions xmlns="http://ws.apache.org/ns/synapse">
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
        <parameter name="cachableDuration">15000</parameter>
    </registry>
    <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager">
        <parameter name="cachableDuration">15000</parameter>
    </taskManager>
     <sequence name="request" onError="fault">
        <log level="full"/>
        <filter regex="1"
            source="get-property('default','jms.message.delivery.count')" xmlns:ns="http://org.apache.synapse/xsd">
            <then>
                <log>
                    <property name="DeliveryCounter" value="1"/>
                </log>
            </then>
            <else>
                <store messageStore="JMS-Redelivered-Store"/>
                <log>
                    <property name="DeliveryCounter" value="more than 1"/>
                </log>
            </else>
        </filter>
        <drop/>
    </sequence>
    <sequence name="fault">
        <log level="full">
            <property name="MESSAGE" value="Executing default &quot;fault&quot; sequence"/>
            <property expression="get-property('ERROR_CODE')" name="ERROR_CODE"/>
            <property expression="get-property('ERROR_MESSAGE')" name="ERROR_MESSAGE"/>
        </log>
        <drop/>
    </sequence>
    <sequence name="main">
        <log level="full"/>
        <drop/>
    </sequence>
    <messageStore name="JMS-Redelivered-Store"/>
    <inboundEndpoint name="jms_inbound" onError="fault" protocol="jms"
        sequence="request" suspend="false">
        <parameters>
            <parameter name="interval">1000</parameter>
            <parameter name="transport.jms.Destination">queue/mySampleQueue</parameter>
            <parameter name="transport.jms.CacheLevel">1</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
            <parameter name="sequential">true</parameter>
            <parameter name="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</parameter>
            <parameter name="java.naming.provider.url">jnp://localhost:1099</parameter>
            <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
            <parameter name="transport.jms.SessionTransacted">false</parameter>
            <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
        </parameters>
    </inboundEndpoint>
</definitions>

This configuration creates an inbound endpoint to the JMS broker and has a simple sequence that logs the message status using the JMSXDeliveryCount value. 

To build the sample

  • Start the ESB Profile of WSO2 EI with the sample configuration.

Executing the sample scenario

  • Run the following java file to publish a message to the JMS queue:

    SOAPPublisher.java
    package JMSXDeliveryCount;
    
    import java.util.Properties;
    import java.util.logging.Logger;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSContext;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    
    public class SOAPPublisher {
    	private static final Logger log = Logger.getLogger(SOAPPublisher.class.getName());
    
    	// Set up all the default values
    	private static final String param = "IBM";
    
    	// with header for inbounds
    	private static final String MESSAGE_WITH_HEADER =
    			"<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
    					"   <soapenv:Header/>\n" +
    					"<soapenv:Body>\n" +
    					"<m:placeOrder xmlns:m=\"http://services.samples\">\n" +
    					"    <m:order>\n" +
    					"        <m:price>" +
    					getRandom(100, 0.9, true) +
    					"</m:price>\n" +
    					"        <m:quantity>" +
    					(int) getRandom(10000, 1.0, true) +
    					"</m:quantity>\n" +
    					"        <m:symbol>" +
    					param +
    					"</m:symbol>\n" +
    					"    </m:order>\n" +
    					"</m:placeOrder>" +
    					"   </soapenv:Body>\n" +
    					"</soapenv:Envelope>";
    	private static final String DEFAULT_CONNECTION_FACTORY = "QueueConnectionFactory";
    	private static final String DEFAULT_DESTINATION = "queue/mySampleQueue";
    	private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
    	private static final String PROVIDER_URL = "jnp://localhost:1099";
    
    	public static void main(String[] args) {
    
    		Context namingContext = null;
    
    		try {
    
    			// Set up the namingContext for the JNDI lookup
    			final Properties env = new Properties();
    			env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
    			env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
    			namingContext = new InitialContext(env);
    
    			// Perform the JNDI lookups
    			String connectionFactoryString =
    					System.getProperty("connection.factory",
    					                   DEFAULT_CONNECTION_FACTORY);
    			log.info("Attempting to acquire connection factory \"" + connectionFactoryString + "\"");
    			ConnectionFactory connectionFactory =
    					(ConnectionFactory) namingContext.lookup(connectionFactoryString);
    			log.info("Found connection factory \"" + connectionFactoryString + "\" in JNDI");
    
    			String destinationString = System.getProperty("destination", DEFAULT_DESTINATION);
    			log.info("Attempting to acquire destination \"" + destinationString + "\"");
    			Destination destination = (Destination) namingContext.lookup(destinationString);
    			log.info("Found destination \"" + destinationString + "\" in JNDI");
    
    			// String content = System.getProperty("message.content",
    			// DEFAULT_MESSAGE);
    			String content = System.getProperty("message.content", MESSAGE_WITH_HEADER);
    
    			try (JMSContext context = connectionFactory.createContext()) {
    				log.info("Sending  message");
    				// Send the message
    				context.createProducer().send(destination, content);
    			}
    
    		} catch (NamingException e) {
    			log.severe(e.getMessage());
    		} finally {
    			if (namingContext != null) {
    				try {
    					namingContext.close();
    				} catch (NamingException e) {
    					log.severe(e.getMessage());
    				}
    			}
    		}
    	}
    
    
    	private static double getRandom(double base, double varience, boolean onlypositive) {
    		double rand = Math.random();
    		return (base + (rand > 0.5 ? 1 : -1) * varience * base * rand) *
    				(onlypositive ? 1 : rand > 0.5 ? 1 : -1);
    	}
    }

Analyzing the output

When you analyze the output on the ESB console, you will see an entry similar to the following:

INFO - LogMediator To: , MessageID: ID:60868ca5-d174-11e5-b7de-f9743c9bcc9e, Direction: request, DeliveryCounter = 1
  • No labels