Due to a known issue do not use JDK1.8.0_151 with WSO2 products. Use JDK 1.8.0_144 until JDK 1.8.0_162-ea is released.
This documentation is for WSO2 Message Broker version 3.2.0. For the latest documentation, see the documentation for WSO2 Enterprise Integrator.
Skip to end of metadata
Go to start of metadata

This sample demonstrates how WSO2 MB can be used to create a chat client that uses MQTT.

About the sample

The <MB_HOME>/Samples/MqttChatClient/src/main/java/org/wso2/sample/mqtt directory has the following classes:

This class holds a basic MQTT client. It also implements the Callback handler for this client. The code is as follows:

package org.wso2.sample.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.io.File;
/**
 * The MQTT clients which is used by the chat client to send/receive messages.
 */
public class AndesMQTTClient implements MqttCallback {
    /**
     * The Message Broker URL
     */
    private static final String brokerURL = "tcp://localhost:1883";
    /**
     * The temporary directory for mqtt client to work with
     */
    private static final String tmpDir = System.getProperty("java.io.tmpdir");
    /**
     * The MQTT client which is used to communicate with the server
     */
    private MqttClient mqttClient;
    /**
     * The unique MQTT client Id
     */
    private final String clientId;
    
    /**
     * Credentials to be used when connecting to MQTT server
     */
    private static final String DEFAULT_USER_NAME = "admin";
    
    private static final String DEFAULT_PASSWORD = "admin";
    
    
    /**
     * Create a new MQTT client with the given client Id. Return after the connection is successful.
     *
     * @param clientId The unique client Id
     * @throws MqttException
     */
    public AndesMQTTClient(String clientId) throws MqttException {
        this.clientId = clientId;
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(DEFAULT_USER_NAME);
        options.setPassword(DEFAULT_PASSWORD.toCharArray());
        
        mqttClient = new MqttClient(brokerURL, clientId, new MqttDefaultFilePersistence(tmpDir + File.separator +
                clientId));
        mqttClient.setCallback(this);
        mqttClient.connect(options);
    }
    /**
     * Subscribe to a given topic in given qos. Return after the subscription is complete.
     *
     * @param topic The topic to subscribe to
     * @param qos   The quality of service
     * @throws MqttException
     */
    public void subscribe(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }
    /**
     * Un-subscribe from a given topic after publishing to the chat server that this client is going to leave the
     * given chat.
     *
     * @param topic The topic to un-subscribe from
     * @throws MqttException
     */
    public void unsubscribe(String topic) throws MqttException {
        mqttClient.publish(topic, (clientId + " has left the conversation").getBytes(), 2, false);
        mqttClient.unsubscribe(topic);
    }
    /**
     * Send message to a given topic.
     *
     * @param topic   The topic to send message to
     * @param message The message to send
     * @param qos     The quality of service
     * @throws MqttException
     */
    public void sendMessage(String topic, String message, int qos) throws MqttException {
        String encodedMessage = ChatWindow.encodeMessage(clientId, message);
        mqttClient.publish(topic, encodedMessage.getBytes(), qos, false);
    }
    /**
     * Disconnect the mqtt client from the server.
     *
     * @throws MqttException
     */
    public void disconnect() throws MqttException {
        mqttClient.disconnect();
    }
    /**
     * Handle if connection is lost with the server.
     *
     * @param throwable Cause
     */
    @Override
    public void connectionLost(Throwable throwable) {
        ChatWindow.outputToChatWindow("Connection lost");
    }
    /**
     * Handle receiving a message from the server.
     *
     * @param topic       The topic message received from
     * @param mqttMessage The received message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        synchronized (this.getClass()) {
            String chatFrom = null;
            // If message is received through the personal channel it is a personal message. Otherwise it is a group
            // chat
            if (!clientId.equals(topic)) {
                chatFrom = topic;
            }
            ChatWindow.decodeAndOutputMessage(chatFrom, mqttMessage.toString());
        }
    }
    /**
     * On delivery complete notify it to the chat console.
     *
     * @param iMqttDeliveryToken Delivery information token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        synchronized (this.getClass()) {
            for (String topic : iMqttDeliveryToken.getTopics()) {
                String chatName = null;
                if (!topic.equals(clientId)) {
                    chatName = topic;
                }
                ChatWindow.decodeAndOutputMessage(chatName, "Message Sent");
            }
        }
    }
}

This class holds an AndesMQTTClient that subscribes/publishes via MQTT. The code is as follows:

package org.wso2.sample.mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
/**
 * Represents a chat client which hosts a mqtt client.
 */
public class ChatClient {
    // For a chat client messages should be received exactly once, which is qos 2 in MQTT.
    private static final int qos = 2;
    private AndesMQTTClient mqttClient;
    /**
     * Create a chat client an initialises a mqtt client on it's name.
     *
     * @param name The name of the chat client
     * @throws MqttException
     */
    public ChatClient(String name) throws MqttException {
        mqttClient = new AndesMQTTClient(name);
        mqttClient.subscribe(name, qos);
    }
    /**
     * Start/Join a group chat.
     *
     * @param groupName The group name
     * @throws MqttException
     */
    public void startGroupConversation(String groupName) throws MqttException {
        mqttClient.subscribe(groupName, qos);
        ChatWindow.outputToChatWindow("Joined to the group : " + groupName);
    }
    /**
     * Leave a group chat.
     *
     * @param groupName The group name
     * @throws MqttException
     */
    public void endGroupConversation(String groupName) throws MqttException {
        mqttClient.unsubscribe(groupName);
        ChatWindow.outputToChatWindow("Left the group : " + groupName);
    }
    /**
     * Send a chat message.
     *
     * @param chatName The person/group to send message to
     * @param message  The message
     * @throws MqttException
     */
    public void sendMessage(String chatName, String message) throws MqttException {
        mqttClient.sendMessage(chatName, message, qos);
    }
    /**
     * Close the chat client.
     *
     * @throws MqttException
     */
    public void closeClient() throws MqttException {
        mqttClient.disconnect();
    }
}

This class represents a chat window in a chat application. The code is as follows:

package org.wso2.sample.mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Scanner;
/**
 * Represents a chat console.
 */
public final class ChatWindow {
    /**
     * The new line character
     */
    private static final String NEW_LINE = "\n";
    /**
     * Message header and content separating string
     */
    private static final String SEPARATOR = "::";
    /**
     * Scanner to read user input
     */
    private static final Scanner scanner = new Scanner(System.in);
    /**
     * Output StreamWriter to write to the console
     */
    private static final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(System.out));
    /**
     * The delimiter to separate each keyword in a user input command
     */
    private static final String COMMAND_DELIMITER = " ";
    /**
     * The command to exit
     */
    private static final String EXIT_COMMAND = "exit";
    /**
     * The command keyword to join a group chat
     */
    private static final String JOIN_GROUP_COMMAND = "join";
    /**
     * The command keyword to leave a group chat
     */
    private static final String LEAVE_GROUP_COMMAND = "leave";
    /**
     * The command keyword to get help
     */
    private static final String HELP_COMMAND = "help";
    /**
     * The command line helper string
     */
    private static final String HELP_STRING = "Use <alias/group message> to chat to a desired group or a person" +
            NEW_LINE + "<join group_name> to join a group chat" + NEW_LINE + "<leave group_name> to leave a group chat"
            + NEW_LINE + "<exit> to exit" + NEW_LINE;
    /**
     * Print a given message to the chat window console
     *
     * @param message The message to print to the console
     */
    public static void outputToChatWindow(String message) {
        try {
            writer.write(">" + message + NEW_LINE);
            writer.flush();
        } catch (IOException ignore) {
            // Silently ignore since there is no other way than this method itself to print to the output
        }
    }
    public static String getInputFromChatWindow() {
        return scanner.nextLine();
    }
    /**
     * Decode a given message and output to the chat window console.
     * This is invoked when a new message is received to the chat client.
     *
     * @param chatName The chat name to decide on which chat the message should be shown given that there are
     *                 multiple active chats
     * @param message  The received message
     */
    public static void decodeAndOutputMessage(String chatName, String message) {
        StringBuilder output = new StringBuilder();
        if (null == chatName) {
            output.append("Personal message ");
        } else {
            output.append("chat with ").append(chatName).append(NEW_LINE);
        }
        String decoder[] = message.split(SEPARATOR);
        if (decoder.length == 1) { // Info message
            output.append("Info : ").append(decoder[0]);
        } else if (decoder.length == 2) { // chat message
            output.append("from ").append(decoder[0]).append(NEW_LINE).append(decoder[1]);
        } else {
            output.append("Invalid message received from the server.");
        }
        output.append(NEW_LINE).append("Waiting for your input. Use <help> for more info").append(NEW_LINE);
        outputToChatWindow(output.toString());
    }
    /**
     * Encode a given message with sender::message.
     *
     * @param sender  The message sender Id
     * @param message The message to send
     * @return The encoded message
     */
    public static String encodeMessage(String sender, String message) {
        return sender + SEPARATOR + message;
    }
    /**
     * Request and read user input from console giving a message to specify the request.
     *
     * @param message The input request message
     * @return User input line
     */
    public static String getInput(String message) {
        ChatWindow.outputToChatWindow(message);
        return getInputFromChatWindow();
    }
    /**
     * Directly read user input from the console. Use when user has already been notified about what to input.
     *
     * @return User input line
     */
    public static String getInput() {
        return getInputFromChatWindow();
    }
    /**
     * Process a given user input and take actions accordingly.
     * - Set exit flag
     * - Send messages
     * - Join a group conversation
     * - Leave a group conversation
     *
     * @param input      The user input line
     * @param chatClient The mqtt client to use when
     * @return Running condition
     * @throws MqttException
     */
    public static boolean processInput(String input, ChatClient chatClient) throws MqttException {
        boolean running = true;
        if (EXIT_COMMAND.equalsIgnoreCase(input)) {
            running = false;
        } else if (HELP_COMMAND.equalsIgnoreCase(input)) {
            printHelper();
        } else {
            String[] inputArgs = input.split(COMMAND_DELIMITER, 2);
            int argsLength = inputArgs.length;
            if (2 == argsLength) {
                String arg1 = inputArgs[0];
                String arg2 = inputArgs[1];
                if (JOIN_GROUP_COMMAND.equalsIgnoreCase(arg1)) {
                    chatClient.startGroupConversation(arg2);
                } else if (LEAVE_GROUP_COMMAND.equalsIgnoreCase(arg1)) {
                    chatClient.endGroupConversation(arg2);
                } else {
                    chatClient.sendMessage(arg1, arg2);
                }
            } else {
                outputToChatWindow("Incorrect command.");
                printHelper();
            }
        }
        return running;
    }
    /**
     * Print the help string to the output window.
     */
    public static void printHelper() {
        outputToChatWindow(HELP_STRING);
    }
}

This class defines the method to call the other classes. The code is as follows:

package org.wso2.sample.mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
import java.util.concurrent.TimeUnit;
/**
 * This sample demonstrates how to use WSO2 Message Broker to create a chat client which uses MQTT.
 * <p/>
 * The Main class which executes the sample.
 * - Creates several chat clients
 * - Initiates personal conversations
 * - Initiates group conversations
 */
public class Main {
    private static ChatClient chatClient;
    private static boolean running = true;
    /**
     * The main method which invokes the sample.
     * - Creates a chat client
     * - Takes user input
     *
     * @param args Command line arguments
     * @throws MqttException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws MqttException, InterruptedException {
        String alias = ChatWindow.getInput("Please enter your chat alias : ");
        chatClient = new ChatClient(alias);
        ChatWindow.printHelper();
        while (running) {
            String input = ChatWindow.getInput();
            running = ChatWindow.processInput(input, chatClient);
            TimeUnit.SECONDS.sleep(1L);
        }
        disconnect();
    }
    /**
     * Disconnect all the chat clients from the server.
     *
     * @throws MqttException
     */
    private static void disconnect() throws MqttException {
        chatClient.closeClient();
    }

}

Prerequisites

Before you build the sample, the prerequisites for MB samples should be in place.

Building the sample

If you are building an MQTT sample for the first time, you need to build the sample using Maven. This will download the Maven dependencies needed for your MQTT samples. Once the dependencies are downloaded, you can build any of the MQTT samples using either Maven or Ant.

Using Maven:
  1. Navigate to the MqttChatClient sample folder in the <MB_HOME>/samples directory.
  2. Execute the mvn clean install command to build and run the sample.
  3. If the build is successful, you can analyze the output as shown below.
Using Ant:

Be sure that the Maven dependencies required for MQTT samples are already downloaded as explained here.

  1. Navigate to the MqttChatClient sample folder in the <MB_HOME>/samples directory.

  2. Execute the ant command to build and run the mqtt retain sample.

  3. If the build is successful, you can anlyze the output as shown below.

Analyzing the output

Once you run the sample, you can carry out the following activities.

ActivityCommand
Sending personal messages or group messages.name/group_name message to send
Joining a group chat join group_name
Leaving a group chatjoin group_name
Exiting the sampleexit

Example

  1. Run two instances of this sample by running the ant or the mvn clean install command from the <MB_HOME>/samples/MqttChatClient directory directory in two consoles. 
  2. You will be requested for a chat alias in both instances. Enter abc in one terminal and def in the other. The following would appear in the log of both instances.

    [java] >Use <alias/group message> to chat to a desired group or a person
    [java] <join group_name> to join a group chat
    [java] <leave group_name> to leave a group chat
    [java] <exit> to exit
  3. Enter the following command in the terminal of the abc chat alias.

    def Hello 

    The following will appear in the log of the def chat alias.

    [java] >Personal message from abc
    [java] hello
    [java] Waiting for your input. Use <help> for more info
  4. Connect both chat aliases to a group chat using the following command.

    join Chat1Group

    You will get the following log for both instances.

    [java] >Joined to the group : Chat1Group
  5. Enter the command leave Chat1Group in the terminal for ABC chat alias. The following would appear in the log.

    [java] >Left the group : Chat1Group
  6. Type exit in the same terminal to stop the chat application. 

  • No labels