Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.


About the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes:

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games.

  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above.

  • Main.java class defines the method to call both the clients.

Click the relevant tab to see the code.

package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    String topicName_6 = "Games.Cricket.*";
    String topicName_7 = "Games.Cricket.#";

    private boolean isSubscriptionComplete = false;

    @Override
    public void run() {
        try {
            subscribe();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void subscribe() throws NamingException, JMSException {
        InitialContext ctx = init();
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();

        //Create two topic sessions since a number of clients cannot be connected from the same session
        TopicSession topicSession1 =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        TopicSession topicSession2 =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        Topic topic1 = topicSession1.createTopic(topicName_1);
        Topic topic2 = topicSession1.createTopic(topicName_2);
        Topic topic3 = topicSession1.createTopic(topicName_3);
        Topic topic4 = topicSession1.createTopic(topicName_4);
        Topic topic5 = topicSession1.createTopic(topicName_5);
        Topic topic6 = (Topic) ctx.lookup(topicName_6);
        Topic topic7 = (Topic) ctx.lookup(topicName_7);
        TopicSubscriber topicSubscriber1 = topicSession1.createSubscriber(topic6);
        TopicSubscriber topicSubscriber2 = topicSession2.createSubscriber(topic7);

        isSubscriptionComplete = true;
        // Receive messages
        Message message1;
	System.out.println(" Receiving messages for " + topicName_6 + " :");
        while ((message1 = topicSubscriber1.receive(5000)) != null){
            if (message1 instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message1;
                System.out.println("Got Message from subscriber1 => " + textMessage.getText());
            }
        }

        Message message2;
	System.out.println(" Receiving messages for " + topicName_7 + " :");
        while ((message2 = topicSubscriber2.receive(5000)) != null){
            if (message2 instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message2;
                System.out.println("Got Message from subscriber2 => " + textMessage.getText());
            }
        }

        topicSubscriber1.close();
        topicSubscriber2.close();
        topicSession1.close();
        topicSession2.close();
        topicConnection.stop();
        topicConnection.close();
    }

    private InitialContext init() throws NamingException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_6,topicName_6);
        properties.put("topic."+topicName_7,topicName_7);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public boolean isSubscriptionComplete(){
        return this.isSubscriptionComplete;
    }
}
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    public void publishMessage() throws NamingException, JMSException {

        InitialContext ctx = init();
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic1 = (Topic) ctx.lookup(topicName_1);
        Topic topic2 = (Topic) ctx.lookup(topicName_2);
        Topic topic3 = (Topic) ctx.lookup(topicName_3);
        Topic topic4 = (Topic) ctx.lookup(topicName_4);
        Topic topic5 = (Topic) ctx.lookup(topicName_5);

        javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
        javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
        javax.jms.TopicPublisher topicPublisher3 = topicSession.createPublisher(topic3);
        javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
        javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);

        // Create the messages to send
        TextMessage textMessage1 = topicSession.createTextMessage("Message for Games");
        TextMessage textMessage2 = topicSession.createTextMessage("Message for Cricket");
        TextMessage textMessage3 = topicSession.createTextMessage("Message for SL");
        TextMessage textMessage4 = topicSession.createTextMessage("Message for India");
        TextMessage textMessage5 = topicSession.createTextMessage("Message for Delhi");
        topicPublisher1.publish(textMessage1);
        topicPublisher2.publish(textMessage2);
        topicPublisher3.publish(textMessage3);
        topicPublisher4.publish(textMessage4);
        topicPublisher5.publish(textMessage5);
        topicSession.close();
        topicConnection.close();
    }

    private InitialContext init() throws NamingException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_1,topicName_1);
        properties.put("topic."+topicName_2,topicName_2);
        properties.put("topic."+topicName_3,topicName_3);
        properties.put("topic."+topicName_4,topicName_4);
        properties.put("topic."+topicName_5,topicName_5);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
package org.sample.jms;
 
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.start();
        while (!hierarchicalTopicsClient.isSubscriptionComplete()){
            Thread.sleep(500);
        }
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage();
    }

}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

     [java]  Receiving messages for Games.Cricket.* :
     [java]  Receiving messages for Games.Cricket.# :
  • No labels