Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how the Time to Live (TTL)  can be set for messages published to WSO2 message broker (WSO2 MB). Go to Configuring Message Expiration for more information on this feature.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

About the sample

This sample demonstrates first introduces a sample JMS client named QueueSender, which is used to send messages with or without a TTL value set for a queue in WSO2 Message Broker. Then it uses a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.

The <MB_HOME>/Samples/JmsExpirationSample/src/org/sample/jms directory has the following classes:

package org.sample.jms;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * Sample sender to send the messages with/without TTL
 */
public class SampleQueueSender {

    public static final String QPID_ICF =  	                  "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    /**
     * Send the specified number of messages with the specified ttl.
     * @param noOfMessages Number of messages that need to be sent
     * @param timeToLive Time to live value for mesages
     * @throws NamingException
     * @throws JMSException
     */
    public void sendMessages(int noOfMessages, long timeToLive) throws  		   NamingException,JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, 						     getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory)     ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        for(int i = 0; i < noOfMessages; i++){
	    //send the text message in persistent delivery mode with a time to live value at priority level 4
            queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);
        }
        queueSender.close();
        queueSession.close();
        queueConnection.close();
    }

    /**
     * Creates amqp url.
     *
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
}
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * Sample Receiver to receive the messages which were not expired
 */
public class SampleQueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    /**
     * Register Subscriber for a queue.
     * @return MessageConsumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
     */
    public MessageConsumer registerSubscriber() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  (Queue) ctx.lookup(queueName);
        MessageConsumer consumer = queueSession.createConsumer(queue);
        return consumer;
    }

    /**
     * Receive messages using the consumer.
     * @param consumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
     */
    public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {

        int receivedMessageCount = 0;
	//have 5 seconds as receive timeout value to stop the consumer
        while(null != consumer.receive(5000)){
            receivedMessageCount ++;
        }
        System.out.println("Received message count: " + receivedMessageCount);

    }

    /**
     * Close the connections at the end of operation
     * @param consumer The message consumer object of the subscriber.
     * @throws JMSException
     */
    public void closeConnections(MessageConsumer consumer) throws JMSException{
        consumer.close();
        queueSession.close();
        queueConnection.stop();
        queueConnection.close();
    }

    /**
     * Creates amqp url.
     *
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
}
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

/**
 * Sample executor class for message TTL
 */
public class Main {

    public static void main(String[] args) throws NamingException, JMSException {

        SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
        MessageConsumer consumer = queueReceiver.registerSubscriber();

        //Send messages with very less time to live value
        System.out.println("Sending 5 messages with TTL value of 1sec");
        SampleQueueSender queueSenderWithTTL = new SampleQueueSender();
        queueSenderWithTTL.sendMessages(5,1000);
        queueReceiver.receiveMessages(consumer);

        //send messages without time to live value
        System.out.println("Sending 5 messages without TTL");
        SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();
        queueSenderWithoutTTL.sendMessages(5,0);
        queueReceiver.receiveMessages(consumer);

        //send messages with considerable time to live value
        System.out.println("Sending 5 messages TTL value of 10sec");
        SampleQueueSender queueSenderWithMediumTTL = new SampleQueueSender();
        queueSenderWithMediumTTL.sendMessages(5,10000);
        queueReceiver.receiveMessages(consumer);
        //close the connection
        queueReceiver.closeConnections(consumer);
    }
}

Building the sample

Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample directory.

Analyzing the output

You will get the following log in your console.

[java] Sending 5 messages with TTL value of 1sec
[java] Received message count: 0
[java] Sending 5 messages without TTL
[java] Received message count: 5
[java] Sending 5 messages TTL value of 10sec
[java] Received message count: 5

The first 5 messages were published with a TTL value of one second and none of them got delivered since they expired. In the second case, 5 messages were sent without a TTL value and all of them got delivered. In the last case, 5 messages were sent with a TTL value of ten seconds and all of them got delivered since they could reach the recipient before the messages expire.

  • No labels