This documentation is for WSO2 Message Broker 3.0.0. View documentation for the latest release.
Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how transactional messages work with WSO2 MB.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

About the sample

In this sample, a JMS subscriber connects to WSO2 MB and publishes messages to a queue using a 'transacted' session. Using this session ensures that the messages published will persist in WSO2 MB (i.e. will be stored to the DB) only when they are committed. Therefore, as demonstrated by this sample, publishing messages to WSO2 MB through a transacted session involves two steps:

  1. The messages have to be sent from the publisher client.
  2. The messages have to be committed from the publisher client.

The <MB_HOME>/Samples/JMSQueueClient/src/org/sample/jms directory has the following classes implementing the behaviour explained above.

package org.sample.jms;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
 * This class contains methods which is used in creating and using a transactional JMS message publisher.
 */
public class TransactionalQueuePublisher {
    private static Logger log = Logger.getLogger(TransactionalQueuePublisher.class);
    /**
     * Andes initial context factory.
     */
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    /**
     * Connection factory name prefix.
     */
    public static final String CF_NAME_PREFIX = "connectionfactory.";
    /**
     * Andes connection factory name.
     */
    public static final String CF_NAME = "andesConnectionfactory";
    /**
     * The authorized username for the AMQP connection url.
     */
    private static final String userName = "admin";
    /**
     * The authorized password for the AMQP connection url.
     */
    private static final String password = "admin";
    /**
     * Client id for the AMQP connection url.
     */
    private static final String CARBON_CLIENT_ID = "carbon";
    /**
     * MB's Virtual host name should be match with this, default name is "carbon" can be configured.
     */
    private static final String CARBON_VIRTUAL_HOST_NAME = "carbon";
    /**
     * IP Address of the host for AMQP connection url.
     */
    private static final String CARBON_DEFAULT_HOSTNAME = "localhost";
    /**
     * Standard AMQP port number for the connection url.
     */
    private static final String CARBON_DEFAULT_PORT = "5672";
    /**
     * Queue prefix for initializing context.
     */
    private static final String QUEUE_NAME_PREFIX = "queue.";
    /**
     * The queue connection in which the messages would be published.
     */
    private QueueConnection queueConnection;
    /**
     * The queue session in which the messages would be published.
     */
    private QueueSession queueSession;
    /**
     * The queue in which the messages would be published.
     */
    private Queue queue;
    /**
     * Creates a transactional JMS publisher.
     *
     * @param queueName The name of the queue to which messages should be published.
     * @throws NamingException
     * @throws JMSException
     */
    public TransactionalQueuePublisher(String queueName) throws NamingException, JMSException {
        // Creating properties for the initial context
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        // Creating initial context
        InitialContext initialContext = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) initialContext.lookup(CF_NAME);
        // Create a JMS connection
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        // Create JMS session object. Here we mentioned that the messages will be published transactionally to the
        // broker.
        queueSession = queueConnection.createQueueSession(true, QueueSession.SESSION_TRANSACTED);
        // Look up a JMS queue
        queue = (Queue) initialContext.lookup(queueName);
        // Adding a shutdown hook listener
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    shutdownPublisher();
                } catch (JMSException jmsException) {
                    throw new RuntimeException(jmsException.getMessage(), jmsException);
                }
            }
        });
    }
    /**
     * Publishes a JMS message.
     *
     * @param messageContent The message content to publish.
     * @throws JMSException
     */
    public void sendMessage(String messageContent) throws JMSException {
        // Create the message to send
        TextMessage textMessage = queueSession.createTextMessage(messageContent);
        // Sending a message
        QueueSender queueSender = queueSession.createSender(queue);
        queueSender.send(textMessage);
        log.info("Message sent : " + textMessage.getText());
    }
    /**
     * Committing all messages that are being sent.
     *
     * @throws JMSException
     */
    public void commitMessages() throws JMSException {
        log.info("Committing messages.");
        queueSession.commit();
    }
    /**
     * Rollbacks all sent messages.
     *
     * @throws JMSException
     */
    public void rollbackMessages() throws JMSException {
        log.info("Rollbacks all uncommitted messages.");
        queueSession.rollback();
    }
    /**
     * Gets an AMQP connection string.
     *
     * @param username authorized username for the connection string.
     * @param password authorizes password for the connection string.
     * @return AMQP Connection URL
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
    /**
     * Shutting down the consumer.
     *
     * @throws JMSException
     */
    public void shutdownPublisher() throws JMSException {
        log.info("Shutting down publisher.");
        // Housekeeping
        if (null != queueSession) {
            queueSession.close();
        }
        if (null != queueConnection) {
            queueConnection.close();
        }
    }
}
package org.sample.jms;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
 * This class contains methods and properties relate to Queue Receiver (Subscriber)
 */
public class QueueConsumer {
    private static Logger log = Logger.getLogger(QueueConsumer.class);
    /**
     * Andes initial context factory.
     */
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    /**
     * Connection factory name prefix.
     */
    public static final String CF_NAME_PREFIX = "connectionfactory.";
    /**
     * Andes connection factory name.
     */
    public static final String CF_NAME = "andesConnectionfactory";
    /**
     * The authorized username for the AMQP connection url.
     */
    private static final String userName = "admin";
    /**
     * The authorized password for the AMQP connection url.
     */
    private static final String password = "admin";
    /**
     * Client id for the AMQP connection url.
     */
    private static final String CARBON_CLIENT_ID = "carbon";
    /**
     * MB's Virtual host name should be match with this, default name is "carbon" can be configured.
     */
    private static final String CARBON_VIRTUAL_HOST_NAME = "carbon";
    /**
     * IP Address of the host for AMQP connection url.
     */
    private static final String CARBON_DEFAULT_HOSTNAME = "localhost";
    /**
     * Standard AMQP port number for the connection url.
     */
    private static final String CARBON_DEFAULT_PORT = "5672";
    /**
     * Queue prefix for initializing context.
     */
    private static final String QUEUE_NAME_PREFIX = "queue.";
    /**
     * The queue connection in which the messages would be published.
     */
    private QueueConnection queueConnection;
    /**
     * The queue session in which the messages would be published.
     */
    private QueueSession queueSession;
    /**
     * The message consumer for the subscriber.
     */
    private MessageConsumer consumer;
    /**
     * Creating a Message Consumer.
     *
     * @param queueName The name of the queue in which the subscriber should listen to.
     * @throws NamingException
     * @throws JMSException
     */
    public QueueConsumer(String queueName) throws NamingException, JMSException {
        // Creating properties for the initial context
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        // Creating initial context
        InitialContext initialContext = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) initialContext.lookup(CF_NAME);
        // Create a JMS connection
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        // Create JMS session object
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Look up a JMS queue
        Queue queue = (Queue) initialContext.lookup(queueName);
        // Create JMS consumer
        consumer = queueSession.createConsumer(queue);
        // Adding a shutdown hook listener
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    shutdownConsumer();
                } catch (JMSException jmsException) {
                    throw new RuntimeException(jmsException.getMessage(), jmsException);
                }
            }
        });
    }
    /**
     * Receives a single message through the subscriber.
     *
     * @return true if a message was received, else false
     * @throws NamingException
     * @throws JMSException
     */
    public boolean receiveMessage() throws NamingException, JMSException {
        long waitingTime = 5000;
        Message receivedMessage = this.consumer.receive(waitingTime);
        if (null == receivedMessage) {
            log.info("No messages were received within " + waitingTime / 1000 + " seconds.");
            return false;
        } else {
            TextMessage message = (TextMessage) receivedMessage;
            log.info("Received message : " + message.getText());
            return true;
        }
    }
    /**
     * Gets an AMQP connection string.
     *
     * @param username authorized username for the connection string.
     * @param password authorizes password for the connection string.
     * @return AMQP Connection URL
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
    /**
     * Shutting down the consumer.
     */
    public void shutdownConsumer() throws JMSException {
        log.info("Shutting down consumer.");
        // Housekeeping
        if (null != consumer) {
            consumer.close();
        }
        if (null != queueSession) {
            queueSession.close();
        }
        if (null != queueConnection) {
            queueConnection.stop();
        }
        if (null != queueConnection) {
            queueConnection.close();
        }
    }
}
package org.sample.jms;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.naming.NamingException;
/**
 * The following class contains a publisher transactional sample. This sample uses publisher transactions so that it
 * would help in recovering published messages in case if the server goes down. This helps to prevent message loss.
 */
public class MainClass {
    private static final Logger log = Logger.getLogger(MainClass.class);
    /**
     * The main method for the transactional publishing sample.
     *
     * @param args The arguments passed.
     * @throws NamingException
     * @throws JMSException
     */
    public static void main(String[] args) throws NamingException, JMSException {
        // Creating a message consumer
        QueueConsumer queueConsumer = new QueueConsumer("Transactional-Queue");
        // Creating a transactional message publisher
        TransactionalQueuePublisher transactionalQueuePublisher = new TransactionalQueuePublisher("Transactional-Queue");
        log.info("------Sample for Message Sending and Committing.------");
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My First Message.");
        // Attempts to receive a message. No messages were received here as the send message was not committed.
        queueConsumer.receiveMessage();
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Second Message.");
        // Committing all published messages.
        transactionalQueuePublisher.commitMessages();
        // Receives a message.
        queueConsumer.receiveMessage();
        // Receives a message.
        queueConsumer.receiveMessage();
        log.info("------Sample for Message Sending, Rollback and Committing.------");
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Third Message.");
        // Attempts to receive a message. No messages were received here as the sent message was not committed.
        queueConsumer.receiveMessage();
        // Rollbacks all published messages. This can be used in-case if the server has gone down and in need of
        // recovering published messages.
        transactionalQueuePublisher.rollbackMessages();
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Forth Message.");
        // Committing all published messages.
        transactionalQueuePublisher.commitMessages();
        // Receives a message.
        queueConsumer.receiveMessage();
        // Attempts to receive a message. No messages were received here as all the messages were received.
        queueConsumer.receiveMessage();
        // Shutting down the sample.
        System.exit(0);
    }
}

Building the sample

Run the ant command from <MB_HOME>/Samples/TransactionalPublisher directory.

Analyzing the output

The result log shown above explains how the transactional session has worked when publishing messages:

[java] INFO : org.sample.jms.MainClass - ------Sample for Message Sending and Committing.------
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My First Message.
[java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Second Message.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Committing messages.
[java] INFO : org.sample.jms.QueueConsumer - Received message : My First Message.
[java] INFO : org.sample.jms.QueueConsumer - Received message : My Second Message.
[java] INFO : org.sample.jms.MainClass - ------Sample for Message Sending, Rollback and Committing.------
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Third Message.
[java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Rollbacks all uncommitted messages.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Forth Message.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Committing messages.
[java] INFO : org.sample.jms.QueueConsumer - Received message : My Forth Message.
[java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds.
[java] INFO : org.sample.jms.TransactionalQueuePublisher - Shutting down publisher.
[java] INFO : org.sample.jms.QueueConsumer - Shutting down consumer.
  • No labels