Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

Durable topics keep messages persistently until a suitable consumer is available to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactive. See Creating Durable Topic Subscriptions for more information.

About the sample

The <MB_HOME>/Samples/DurableTopicSubscriber/src/org/sample/jms directory has the following classes.

  • DurableTopicSubscriber.java class creates a durable topic subscription named mySub1.

  • SampleMessageListener.java class creates a consumer for the durable topic subscription. 

  • TopicPublisher.java class creates a publisher to publish messages in the durable topic. 

  • Main.java defines the method for calling the three clients mentioned above. 

Click the relevant tab to see the code.

package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "newTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 10;
    private SampleMessageListener messageListener;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private TopicSubscriber topicSubscriber;
    public void subscribe() {
        try {
            System.out.println("Starting the subscriber");
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic." + topicName, topicName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // create durable subscriber with subscription ID
            Topic topic = (Topic) ctx.lookup(topicName);
            topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
            if (!useListener) {
                for (int count = 0; count < messageCount; count++) {
                    Message message = topicSubscriber.receive();
                    System.out.println("count = " + count);
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println(count + ". textMessage.getText() = " + textMessage.getText());
                    }
                    if (delayBetMessages != 0) {
                        Thread.sleep(delayBetMessages);
                    }
                }
                topicConnection.close();
            } else {
                messageListener = new SampleMessageListener(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public void stopSubscriber() throws JMSException {
        topicSubscriber.close();
        topicSession.close();
        topicConnection.close();
        System.out.println("Closing Subscriber");
    }
}
package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int delay = 0;
    private int currentMsgCount = 0;
    public SampleMessageListener(int delay) {
        this.delay = delay;
    }
    public void onMessage(Message message) {
        TextMessage receivedMessage = (TextMessage) message;
        try {
            System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
            currentMsgCount++;
            if(delay != 0) {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    //silently ignore
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)ctx.lookup(topicName);
        // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
        durableTopicSubscriber.subscribe();
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber.stopSubscriber();
        TopicPublisher topicPublisher2 = new TopicPublisher();
        topicPublisher2.publishMessage(5);
        Thread.sleep(5000);
        DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
        durableTopicSubscriber2.subscribe();
        TopicPublisher topicPublisher3 = new TopicPublisher();
        topicPublisher3.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber2.stopSubscriber();
    }
}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from the <MB_Home>/samples/DurableTopicSubscriber directory.

Analyzing the output

The scenario used in this sample to demonstrate durable topic subscriptions is as follows.

  1. durableTopicSubscriber is run to create a durable topic subscriber.
  2. 5 messages are sent to the myTopic topic. The messages will be received and printed by the subscriber named durableTopicSubscriber.
  3. The durableTopicSubscriber is stopped.
  4. The publisher is run again and 5 more messages are sent.
  5. While running durableTopicSubscriber again, 5 different messages are sent to the same topic. You will see that all 10 messages (including the messages sent to the topic when the subscriber was absent) are consumed by the durableTopicSubscriber.
  • No labels