Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how to send and receive messages in WSO2 Message broker via the MQTT transport.

About the sample

The <MB_HOME>/Samples/SimpleMqttClient/src/main/java/org/wso2/sample/mqtt directory has the following files:
 

This file defines the callback handler client which handles the messages returned from MB and prints them in the output log of the console. The Callback handler handles messages returned from the Message Broker. These messages are categorized into 3 types as follows:

  • Connection Lost
  • Message Arrived
  • Delivery Complete

The configuration of this class is as follows:

package org.wso2.sample.mqtt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * The MQTT client callback handler which handles message arrivals, delivery completions and connection lost.
 */
public class SimpleMQTTCallback implements MqttCallback {
    private static final Log log = LogFactory.getLog(SimpleMQTTCallback.class);
    /**
     * Inform when connection with server is lost.
     *
     * @param throwable Connection lost cause
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("Mqtt client lost connection with the server", throwable);
    }
    /**
     * Inform when a message is received through a subscribed topic.
     *
     * @param topic       The topic message received from
     * @param mqttMessage The message received
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("Message arrived on topic : \"" + topic + "\" Message : \"" + mqttMessage.toString() + "\"");
    }
    /**
     * Inform when message delivery is complete for a published message.
     *
     * @param iMqttDeliveryToken The message complete token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        for (String topic : iMqttDeliveryToken.getTopics()) {
            log.info("Message delivered successfully to topic : \"" + topic + "\".");
        }
    }
}

This class defines the MQTT Quality of Service levels that need to be applied. The configuration of this class is as follows:

package org.wso2.sample.mqtt;
/**
 * The quality of service levels in MQTT.
 */
public enum QualityOfService {
    /**
     * The message is delivered at most once, or it may not be delivered at all. Its delivery across the network is
     * not acknowledged. The message is not stored. The message could be lost if the client is disconnected,
     * or if the server fails. QoS0 is the fastest mode of transfer. It is sometimes called "fire and forget".
     */
    MOST_ONCE(0),
    /**
     * The message is always delivered at least once. It might be delivered multiple times if there is a failure
     * before an acknowledgment is received by the sender. The message must be stored locally at the sender,
     * until the sender receives confirmation that the message has been published by the receiver. The message is
     * stored in case the message must be sent again.
     */
    LEAST_ONCE(1),
    /**
     * The message is always delivered exactly once. The message must be stored locally at the sender,
     * until the sender receives confirmation that the message has been published by the receiver. The message is
     * stored in case the message must be sent again. QoS2 is the safest, but slowest mode of transfer. A more
     * sophisticated handshaking and acknowledgement sequence is used than for QoS1 to ensure no duplication of
     * messages occurs.
     */
    EXACTLY_ONCE(2);
    private final int qos;
    /**
     * Initialize with the given Quality of Service.
     * @param qos The quality of service level
     */
    private QualityOfService(int qos) {
        this.qos = qos;
    }
    /**
     * Get the corresponding value for the given quality of service.
     * Retrieve this value whenever quality of service level needs to feed into external libraries.
     *
     * @return The integer representation of this quality of service
     */
    public int getValue() {
        return qos;
    }
}

This class defines the method to be used for running both the classes mentioned above.

package org.wso2.sample.mqtt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
/**
 * This samples demonstrates how to write a simple MQTT client to send/receive message via MQTT in WSO2 Message Broker.
 */
public class Main {
    private static final Log log = LogFactory.getLog(Main.class);
    // Java temporary directory location
    private static final String JAVA_TMP_DIR = System.getProperty("java.io.tmpdir");
    // The MQTT broker URL
    private static final String brokerURL = "tcp://localhost:1883";
    /**
     * The main method which runs the sample.
     *
     * @param args Commandline arguments
     */
    public static void main(String[] args) {
        String subscriberClientId = "subscriber";
        String publisherClientId = "publisher";
        String topic = "simpleTopic";
        boolean retained = false;
        log.info("Running sample");
        byte[] payload = "hello".getBytes();
        try {
            // Creating mqtt subscriber client
            MqttClient mqttSubscriberClient = getNewMqttClient(subscriberClientId);
            // Creating mqtt publisher client
            MqttClient mqttPublisherClient = getNewMqttClient(publisherClientId);
            // Subscribing to mqtt topic "simpleTopic"
            mqttSubscriberClient.subscribe(topic, QualityOfService.LEAST_ONCE.getValue());
            // Publishing to mqtt topic "simpleTopic"
            mqttPublisherClient.publish(topic, payload, QualityOfService.LEAST_ONCE.getValue(), retained);
            mqttPublisherClient.disconnect();
            mqttSubscriberClient.disconnect();
            log.info("Clients Disconnected!");
        } catch (MqttException e) {
            log.error("Error running the sample", e);
        }

    }
    /**
     * Crate a new MQTT client and connect it to the server.
     *
     * @param clientId The unique mqtt client Id
     * @return Connected MQTT client
     * @throws MqttException
     */
    private static MqttClient getNewMqttClient(String clientId) throws MqttException {
        //Store messages until server fetches them
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(JAVA_TMP_DIR + "/" + clientId);
        MqttClient mqttClient = new MqttClient(brokerURL, clientId, dataStore);
        SimpleMQTTCallback callback = new SimpleMQTTCallback();
        mqttClient.setCallback(callback);
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setUserName("admin");
        connectOptions.setPassword("admin".toCharArray());
        connectOptions.setCleanSession(true);
        mqttClient.connect(connectOptions);

        return mqttClient;
    }
}

Prerequisites

Before you build the sample, the prerequisites for MB samples should be in place.

Building the sample

If you are building an MQTT sample for the first time, you need to build the sample using Maven. This will download the Maven dependencies needed for your MQTT samples. Once the dependencies are downloaded, you can build any of the MQTT samples using either Maven or Ant.

Using Maven:
  1. Navigate to the SimpleMqttSample sample folder in the <MB_HOME>/samples directory.
  2. Execute the mvn clean install command to build and run the sample.
  3. If the build is successful, the output will be printed in the terminal as shown below.
Using Ant:

Be sure that the Maven dependencies required for MQTT samples are already downloaded as explained here.

  1. Navigate to the SimpleMqttSample sample folder in the <MB_HOME>/samples directory.

  2. Execute the ant command to build and run the SimpleMqttSample sample.

  3. If the build is successful, the output will be printed in the terminal as shown below.

Run the ant or the mvn clean install command from the <MB_HOME>/samples/SimpleMQTTClient directory. 

Analyzing the output

When you run this sample, the following will be displayed in the output log of the console depending on the command you used:

For the ant command:

[java] INFO  [org.wso2.sample.mqtt.Main] - Running sample
[java] INFO  [org.wso2.sample.mqtt.SimpleMQTTCallback] - Message delivered successfully to topic : "simpleTopic".
[java] INFO  [org.wso2.sample.mqtt.Main] - Clients Disconnected!

For the maven clean install command:

INFO  [org.wso2.sample.mqtt.Main] - Running sample
INFO  [org.wso2.sample.mqtt.SimpleMQTTCallback] - Message delivered successfully to topic : "simpleTopic".
INFO  [org.wso2.sample.mqtt.Main] - Clients Disconnected!
  • No labels