Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how WSO2 MB handles message arrivals, delivery completions and connections lost when the MQTT transport is used.

About the sample

The <MB_HOME>/Samples/MqttRetainSample/src/main/java/org/wso2/sample/mqtt directory has the following classes:

This class defines a callback handler that handles message arrivals, delivery completions and connections lost. The code of this class is as follows:

package org.wso2.sample.mqtt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * The MQTT client callback handler which handles message arrivals, delivery completions and connection lost.
 */
public class SimpleMQTTCallback implements MqttCallback {
    private static final Log log = LogFactory.getLog(SimpleMQTTCallback.class);
    /**
     * Inform when connection with server is lost.
     *
     * @param throwable Connection lost cause
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("Mqtt client lost connection with the server", throwable);
    }
    /**
     * Inform when a message is received through a subscribed topic.
     *
     * @param topic       The topic message received from
     * @param mqttMessage The message received
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("Message arrived on topic : \"" + topic + "\" Message : \"" +
                 mqttMessage.toString() + "\"");
    }
    /**
     * Inform when message delivery is complete for a published message.
     *
     * @param iMqttDeliveryToken The message complete token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        for (String topic : iMqttDeliveryToken.getTopics()) {
            log.info("Message delivered successfully to topic : \"" + topic + "\".");
        }
    }
}

This class applies to the quality of service levels in MQTT. The code of this class is as follows:

package org.wso2.sample.mqtt;
/**
 * The quality of service levels in MQTT.
 */
public enum QualityOfService {
    /**
     * The message is delivered at most once, or it may not be delivered at all. Its delivery across the network is
     * not acknowledged. The message is not stored. The message could be lost if the client is disconnected,
     * or if the server fails. QoS0 is the fastest mode of transfer. It is sometimes called "fire and forget".
     */
    MOST_ONCE(0),
    /**
     * The message is always delivered at least once. It might be delivered multiple times if there is a failure
     * before an acknowledgment is received by the sender. The message must be stored locally at the sender,
     * until the sender receives confirmation that the message has been published by the receiver. The message is
     * stored in case the message must be sent again.
     */
    LEAST_ONCE(1),
    /**
     * The message is always delivered exactly once. The message must be stored locally at the sender,
     * until the sender receives confirmation that the message has been published by the receiver. The message is
     * stored in case the message must be sent again. QoS2 is the safest, but slowest mode of transfer. A more
     * sophisticated handshaking and acknowledgement sequence is used than for QoS1 to ensure no duplication of
     * messages occurs.
     */
    EXACTLY_ONCE(2);
    private final int qos;
    /**
     * Initialize with the given Quality of Service.
     * @param qos The quality of service level
     */
    private QualityOfService(int qos) {
        this.qos = qos;
    }
    /**
     * Get the corresponding value for the given quality of service.
     * Retrieve this value whenever quality of service level needs to feed into external libraries.
     *
     * @return The integer representation of this quality of service
     */
    public int getValue() {
        return qos;
    }
}

This class defines the method that is used for calling both the classes mentioned above. The code of this class is as follows:

package org.wso2.sample.mqtt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
 * If MQTT Retain enabled broker should keep the retain enabled message for future subscribers.
 * This samples demonstrates how MQTT retain feature works.
 */
public class Main {
    private static final Log log = LogFactory.getLog(Main.class);
    /**
     * Java temporary directory location
     */
    private static final String JAVA_TMP_DIR = System.getProperty("java.io.tmpdir");
    /**
     * The MQTT broker URL
     */
    private static final String brokerURL = "tcp://localhost:1883";
    /**
     * topic name for subscriber and publisher
     */
    private static String topic;
    /**
     * retain state for published topic
     */
    private static boolean retained;
    /**
     * The main method which runs the sample.
     *
     * @param args Commandline arguments
     */
    public static void main(String[] args) throws InterruptedException, IOException {
        // buffer reader for read console inputs
        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(System.in));
        String subscriberClientId = "subscriber";
        String publisherClientId = "publisher";
        int maxWaitTimeUntilReceiveMessages = 1000;
        // default topic name
        topic = "simpleTopic";
        // default retain state
        retained = true;
        // default payload
        byte[] payload = "sample message payload".getBytes();
        log.info("Retain Topic Sample");
        getUserInputs(bufferReader);
        log.info("Start sample with Topic Name " + topic + " with retain " + retained + ".");
        try {
            // Creating mqtt subscriber client
            MqttClient mqttSubscriberClient = getNewMqttClient(subscriberClientId);
            // Creating mqtt publisher client
            MqttClient mqttPublisherClient = getNewMqttClient(publisherClientId);
            // Publishing to mqtt topic "simpleTopic"
            mqttPublisherClient.publish(topic, payload, QualityOfService.LEAST_ONCE.getValue(),
                                        retained);
            log.info("Publish topic message with retain enabled for topic name " + topic);
            // Subscribing to mqtt topic "simpleTopic"
            mqttSubscriberClient.subscribe(topic, QualityOfService.LEAST_ONCE.getValue());
            log.info("Subscribe for topic name " + topic);
            Thread.sleep(maxWaitTimeUntilReceiveMessages);
            mqttPublisherClient.disconnect();
            mqttSubscriberClient.disconnect();
            log.info("Clients Disconnected!");
        } catch (MqttException e) {
            log.error("Error running the sample", e);
        }
    }
    /**
     * Read user inputs and set to relevant parameters
     *
     * @param bufferReader buffer text from character input stream
     * @throws IOException
     */
    private static void getUserInputs(BufferedReader bufferReader) throws IOException {
        String lineSeparator = System.getProperty("line.separator");
        log.info("Enter topic name : ");
        String bufferReaderString = bufferReader.readLine();
        if (!bufferReaderString.isEmpty()) {
            topic = bufferReaderString;
        } else {
            log.info("Topic name not valid. Continuing with default topic name : " + topic);
        }
        log.info("Set retain enable [Y/N] : ");
        bufferReaderString = bufferReader.readLine();
        if (bufferReaderString.contentEquals("Y")) {
            // set retain enable
            retained = true;
        } else if (bufferReaderString.contentEquals("N")) {
            // set retain disable
            retained = false;
        } else {
            log.info("Retain state not valid. Continuing with default retain state : " + retained);
        }
        log.info(lineSeparator + "Enter Y to continue with " + topic + " topic name and" +
                 " retain state " + retained + "." + lineSeparator +
                 "Enter N to revise parameters [Y/N] : ");
        bufferReaderString = bufferReader.readLine();
        if (bufferReaderString.contentEquals("N")) {
            getUserInputs(bufferReader);
        }
    }
    /**
     * Crate a new MQTT client and connect it to the server.
     *
     * @param clientId The unique mqtt client Id
     * @return Connected MQTT client
     * @throws MqttException
     */
    private static MqttClient getNewMqttClient(String clientId) throws MqttException {
        //Store messages until server fetches them
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(JAVA_TMP_DIR + "/" + clientId);
        MqttClient mqttClient = new MqttClient(brokerURL, clientId, dataStore);
        SimpleMQTTCallback callback = new SimpleMQTTCallback();
        mqttClient.setCallback(callback);
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setUserName("admin");
        connectOptions.setPassword("admin".toCharArray());
        connectOptions.setCleanSession(true);
        mqttClient.connect(connectOptions);
        return mqttClient;
    }
}

Prerequisites

Before you build the sample, the prerequisites for MB samples should be in place.

Building the sample

If you are building an MQTT sample for the first time, you need to build the sample using Maven. This will download the Maven dependencies needed for your MQTT samples. Once the dependencies are downloaded, you can build any of the MQTT samples using either Maven or Ant.

Using Maven:
  1. Navigate to the MqttRetainSample sample folder in the <MB_HOME>/samples directory.
  2. Execute the mvn clean install command to build and run the sample.
  3. If the build is successful, the output will be printed in the terminal as shown below.
Using Ant:

Be sure that the Maven dependencies required for MQTT samples are already downloaded as explained here.

  1. Navigate to the MqttRetainSample sample folder in the <MB_HOME>/samples directory.

  2. Execute the ant command to build and run the mqtt retain sample.

  3. Once the sample is started, you will be asked to enter a topic name to publish/subscribe. Enter an appropriate name and proceed.

  4. Then it will ask if the retain feature should be enabled for the given topic. Enter Y to enable the retain feature. You can check the behaviour before the retain feature by entering N as the value.

  5. If all parameters are set correctly enter Y and proceed. Or else, you can re-enter values by type N.
  6. If the build is successful, the output will be printed in the terminal as shown below.

Analyzing the output

Following is the output is printed in the terminal if the retain message is successfully received for future subscriber:

run:
     [java] INFO  [org.wso2.sample.mqtt.Main] - Retain Topic Sample
     [java] INFO  [org.wso2.sample.mqtt.Main] - Enter topic name : new1
     [java] INFO  [org.wso2.sample.mqtt.Main] - Set retain enable [Y/N] : Y
     [java] INFO  [org.wso2.sample.mqtt.Main] - 
     [java] Enter Y to continue with new1 topic name and retain state true.
     [java] Enter N to revise parameters [Y/N] : Y
     
     [java] INFO  [org.wso2.sample.mqtt.Main] - Start sample with Topic Name new1 with retain true.
     [java] INFO  [org.wso2.sample.mqtt.Main] - Publish topic message with retain enabled for topic name new1
     [java] INFO  [org.wso2.sample.mqtt.SimpleMQTTCallback] - Message delivered successfully to topic : "new1".
     [java] INFO  [org.wso2.sample.mqtt.Main] - Subscribe for topic name new1
     [java] INFO  [org.wso2.sample.mqtt.SimpleMQTTCallback] - Message arrived on 
     topic : "new1" Message : "sample message payload"
     
     [java] INFO  [org.wso2.sample.mqtt.Main] - Clients Disconnected!
  • No labels