This documentation is for WSO2 MB version 2.2.0. You can [view this page (if available) in the latest documentation] or go to the latest version's home page.

||
Skip to end of metadata
Go to start of metadata

This sample demonstrates how Siddhi engine can be used with JMS event broker to receive, process and publish XML messages.

In this sample CEP will receive stock quote information and fire outputs if the last traded amount vary by 2 percent with regards to the average traded price within past 2 minutes.

from allStockQuotesStream#window.time(120000)
insert into fastMovingStockQuotesStream
symbol,avg(price) as avgPrice, price
group by symbol
having ((price > (avgPrice*1.02)) or ((avgPrice*0.98)>price ));

Here we will publish events using an JMS client to a JMS topic called AllStockQuotes and fired outputs of the bucket will be send to a JMS topic called FastMovingStockQuotes, which will be received using another JMS client and log in console.

Following is the configuration used in this sample.

<cep:bucket xmlns:cep="http://wso2.org/carbon/cep" name="XMLStockQuoteAnalyzer">
    <cep:description>
This bucket analyzes stock quotes and trigger an event if the last
traded amount vary by 2 percent with regards to the average traded
price within past 2 minutes.
    </cep:description>
    <cep:engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <cep:property name="siddhi.persistence.snapshot.time.interval.minutes">0</cep:property>
        <cep:property name="siddhi.enable.distributed.processing">false</cep:property>
    </cep:engineProviderConfiguration>
    <cep:input topic="AllStockQuotes" brokerName="MBJmsBroker">
        <cep:xmlMapping queryEventType="Tuple" stream="allStockQuotesStream">
            <cep:xpathDefinition prefix="quotedata" namespace="http://ws.cdyne.com/"/>
            <cep:property name="price" xpath="//quotedata:StockQuoteEvent/quotedata:LastTradeAmount"
                          type="java.lang.Double"/>
            <cep:property name="symbol" xpath="//quotedata:StockQuoteEvent/quotedata:StockSymbol"
                          type="java.lang.String"/>
        </cep:xmlMapping>
    </cep:input>
    <cep:query name="StockDetector">
        <cep:expression>
            <![CDATA[
from allStockQuotesStream#window.time(120000)
insert into fastMovingStockQuotesStream
symbol,avg(price) as avgPrice, price
group by symbol
having ((price > (avgPrice*1.02)) or ((avgPrice*0.98)>price ));
            ]]></cep:expression>
        <cep:output topic="FastMovingStockQuotes" brokerName="MBJmsBroker">
            <cep:xmlMapping>
                <quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/"
                                               xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                                               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                    <quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
                    <quotedata:AvgLastTradeAmount>{avgPrice}</quotedata:AvgLastTradeAmount>
                    <quotedata:LastTradeAmount>{price}</quotedata:LastTradeAmount>
                </quotedata:StockQuoteDataEvent>
            </cep:xmlMapping>
        </cep:output>
    </cep:query>
</cep:bucket>

Prerequisites

  • Apache Ant to build & deploy the Sample & Service, and to run the client. See Installation Prerequisites for instructions on installing Apache Ant.

Steps to configure the sample

The steps are as follows:

  1. Install the WSO2 Complex Event Processor. See the Installation Guide for instructions.
  2. Now start the WSO2 Complex Event Processor. See Running the Product  for instructions.
  3. Start WSO2 Message Broker with a port offset one (assuming setup is done on a single machine). See Running WSO2 MB for instructions. 
  4. Then configure WSO2 Message Broker as the JMS Broker for CEP server as described in Using WSO2 MB as A JMS Broker for WSO2 CEP Server.

    When CEP connects to MB, the wso2server.sh file needs to be updated with the following:

    system property -Dqpid.dest_syntax=BURL \


  5. Copy the above bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets folder. Note that we have used the JMS Broker  "MBJmsBroker" created at step 4 in the bucket configuration. 

Steps to run the sample

In order to run the above sample we have to do two things. 

  1. Publish events to CEP server. We can do this by publishing events to the AllStockQuotes topic.  
  2. Subscribe for events generated by CEP server according to the query we have defined in bucket configuration above. This can be done in two ways. 
    1. Subscribe an JMS topic message subscriber to the topic FastMovingStockQuotes and get the events.
    2. Subscriber a web service client to the topic FastMovingStockQuotes and receive the events using a web service client instead of using a JMS client. 

Publishing events to CEP server 

We will use a JMS client for this purpose.

Following class will create Initial Context to run the event publisher client. Note that this class is used in event subscriber JMS client as well. 

package org.wso2.cep.sample.jms.andes;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class JNDIContext {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "ConnectionFactory";
    private static final String userName = "admin";
    private static final String password = "admin";
    private static String CARBON_CLIENT_ID = "clientid";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5673";
    private InitialContext initContext = null;
    private TopicConnectionFactory topicConnectionFactory = null;
    public static JNDIContext instance = null;
    private JNDIContext() {
        createInitialContext();
        createConnectionFactory();
    }
    public InitialContext getInitContext() {
        return initContext;
    }
    public TopicConnectionFactory getTopicConnectionFactory() {
        return topicConnectionFactory;
    }
    public static JNDIContext getInstance() {
        if (instance == null) {
            instance = new JNDIContext();
        }
        return instance;
    }
    /**
     * Create Connection factory with initial context
     */
    private void createConnectionFactory() {
        try {
            topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory");
        } catch (NamingException e) {
            System.out.println("Can not create topic connection factory." + e);
        }
    }
    /**
     * Create Initial Context with given configuration
     */
    private void createInitialContext() {

        try {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            System.out.println("TCPConnectionURL: = " + getTCPConnectionURL(userName, password));
            initContext = new InitialContext(properties);
        } catch (NamingException e) {
            System.out.println("Can not create initial context with given parameters." + e);
        }
    }

    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}

 

Following class is the event publisher client. By running this class three events are generated and sent to the CEP server (Actually we publish events to the topic at WSO2 MB, which is registered at CEP server).

package org.wso2.cep.sample.jms.andes.xmlMessage;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axiom.om.util.StAXUtils;
import org.wso2.cep.sample.jms.andes.JNDIContext;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import java.io.ByteArrayInputStream;
public class AllStockQuotesPublisher {
    private static InitialContext initContext = null;
    private static TopicConnectionFactory topicConnectionFactory = null;
    public static void main(String[] args) throws XMLStreamException {

        String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                " <quotedata:StockQuoteEvent>\n" +
                " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                " <quotedata:LastTradeAmount>126.36 </quotedata:LastTradeAmount>\n" +
                " <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                " <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                " <quotedata:PE>10.88</quotedata:PE>\n" +
                " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                " <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                " </quotedata:StockQuoteEvent>\n" +
                " </quotedata:AllStockQuoteStream>";
        String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                " <quotedata:StockQuoteEvent>\n" +
                " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                " <quotedata:LastTradeAmount>36.36</quotedata:LastTradeAmount>\n" +
                " <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                " <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                " <quotedata:PE>10.88</quotedata:PE>\n" +
                " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                " <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                " </quotedata:StockQuoteEvent>\n" +
                " </quotedata:AllStockQuoteStream>";
        String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                " <quotedata:StockQuoteEvent>\n" +
                " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                " <quotedata:LastTradeAmount>6.36</quotedata:LastTradeAmount>\n" +
                " <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                " <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                " <quotedata:PE>10.88</quotedata:PE>\n" +
                " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                " <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                " </quotedata:StockQuoteEvent>\n" +
                " </quotedata:AllStockQuoteStream>";

        initContext = JNDIContext.getInstance().getInitContext();
        topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
        AllStockQuotesPublisher publisher = new AllStockQuotesPublisher();
        XMLStreamReader reader1 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream(
                xmlElement1.getBytes()));
        StAXOMBuilder builder1 = new StAXOMBuilder(reader1);
        OMElement OMMessage1 = builder1.getDocumentElement();
        publisher.publish("AllStockQuotes", OMMessage1);
        XMLStreamReader reader2 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream(
                xmlElement2.getBytes()));
        StAXOMBuilder builder2 = new StAXOMBuilder(reader2);
        OMElement OMMessage2 = builder2.getDocumentElement();
        publisher.publish("AllStockQuotes", OMMessage2);
        XMLStreamReader reader3 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream(
                xmlElement3.getBytes()));
        StAXOMBuilder builder3 = new StAXOMBuilder(reader3);
        OMElement OMMessage3 = builder3.getDocumentElement();
        publisher.publish("AllStockQuotes", OMMessage3);
    }
   
    /**
     * Publish message to given topic
     *
     * @param topicName - topic name to publish messages
     * @param message   - message to send
     */

      public void publish(String topicName, OMElement message) {
        // create topic connection
        TopicConnection topicConnection = null;
        try {
            topicConnection = topicConnectionFactory.createTopicConnection();
            topicConnection.start();
        } catch (JMSException e) {
            System.out.println("Can not create topic connection." + e);
            return;
        }
        // create session, producer, message and send message to given destination(topic)
        // OMElement message text is published here.
        Session session = null;
        try {
            session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(topicName);
            MessageProducer producer = session.createProducer(topic);
            TextMessage jmsMessage = session.createTextMessage(message.toString());
            producer.send(jmsMessage);
            producer.close();
            session.close();
            topicConnection.stop();
            topicConnection.close();
        } catch (JMSException e) {
            System.out.println("Can not subscribe." + e);
        }
    }
}

 

Subscribing for filtered events and notifications  from CEP server 

a. Using a JMS client receiver

Following class acts as a JMS topic subscriber client. We register a subscription for filtered events we get from CEP triggered according to the query at bucket we have defined (Actually we are subscribing for a topic created at WSO2 Message broker, to which CEP will publish filtered events and notifications internally).

package org.wso2.cep.sample.jms.andes.xmlMessage;
import org.wso2.cep.sample.jms.andes.JNDIContext;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.xml.stream.XMLStreamException;
import java.util.Enumeration;
public class FastMovingStockQuotesSubscriber implements MessageListener {
    private static InitialContext initContext = null;
    private static TopicConnectionFactory topicConnectionFactory = null;
    private boolean messageReceived = false;
    static String TOPIC = "FastMovingStockQuotes";
    public static void main(String[] args) throws XMLStreamException {
        initContext = JNDIContext.getInstance().getInitContext();
        topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
        new FastMovingStockQuotesSubscriber().subscribe(TOPIC);
    }
    public void subscribe(String topicName) {
        // create connection
        TopicConnection topicConnection = null;
        try {
            topicConnection = topicConnectionFactory.createTopicConnection();
        } catch (JMSException e) {
            System.out.println("Can not create topic connection." + e);
            return;
        }
        // create session, subscriber, message listener and listen on that topic
        TopicSession session = null;
        try {
            session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(topicName);
            TopicSubscriber subscriber = session.createSubscriber(topic);
            subscriber.setMessageListener(this);
            topicConnection.start();
            synchronized (this) {
                while (!messageReceived) {
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        } catch (JMSException e) {
            System.out.println("Can not subscribe." + e);
        }
    }
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("output = " + textMessage.getText());
                synchronized (this) {
                    messageReceived = true;
                }
            } catch (JMSException e) {
                System.out.println("error at getting text out of received message. = " + e);
            }
        } else if (message instanceof MapMessage) {
            try {
                Enumeration enumeration = ((MapMessage) message).getMapNames();
                for (; enumeration.hasMoreElements(); ) {
                    System.out.println(((MapMessage) message).getString((String) enumeration.nextElement()));
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("Received message is not a text/map message.");
        }
    }
}

 

b. Using a web service message receiver

Deploying receiver service

First we have to deploy a web service at any WSO2 Server which would act as the receiver web service for messages from CEP server. We will use CEP itself to deploy such a web service. 

The steps are as follows:    

  1. In a command prompt, switch to the FastMovingStockQuoteReceiverService services directory: <CEP_HOME>/ samples/services/FastMovingStockQuoteReceiverService
    For example, in Linux: cd <CEP_HOME> /samples/services/FastMovingStockQuoteReceiverService
  2. From there, type ant. This will deploy the FastMovingStockQuoteReceiverService in CEP itself. You can follow the server logs to check whether FastMovingStockQuoteReceiverService.arr has been properly deployed. You will also be able to see the axis2 service in the services list.
Configuring receiver service

We need to configure the FastMovingStockQuoteReceiverService in order to receive the output events emitted by the bucket under the FastMovingStockQuotes topic. Here we will be creating FastMovingStockQuotes topic in the WSO2 Message Broker and subscribe FastMovingStockQuoteReceiverService on that topic.

The steps are as follows:

  1. Sign In. Enter your user name and password to log on to the Message Broker Management Console.
  2. Click Add under the Topics menu in the Manage section of the left panel.
  3. Specify the topic name in the topic input text box. In this case, the topic name is "FastMovingStockQuotes" (the output topic). Click Add Topic. The topic is added to the server and you will be directed to the Topic Browser page.
  4. Once you click on the topic in the topic browser page, you will be able see four links. Click the Subscribe link and you will be directed to the Subscribe page. 
  5. Create subscription with the following details. Once you are done click Subscribe.

    topic		 : FastMovingStockQuotes (Output topic)
    subscription mode: Topic only subscription
    URL		 : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement
    expiration Time	 : select a future date from calender

    Once you click Subscribe, you will be directed to the Topic Browser page. 

  6. You can verify whether you have correctly subscribed to the topic by clicking the Details link of that topic in the topic browser page. 

    Once you click on that, you will be directed to the "topic details" page and there you will find all the subscriptions for that topic and its children (in this case it does not exists) and permission on that topic. Apart from that with the Publish section, you can publish a test XML message to that topic and check whether your subscription URL has been properly subscribed.

Running the Samples

  1. If you are using a JMS subscriber client i.e "Using a JMS Client Receiver", build and run the topic subscriber class provided above. Else if you are using the "Web Service Message Receiver" above configurations under section (b) is enough. 
  2. Run the JMS publisher client.
  3. It can be noticed that CEP analyzes the events we have published  and fire outputs if the last traded amount vary by 2 percent with regards to the average traded price within past 2 minutes.

Observation

Observe the console where we are running the JMS subscriber client or server console where message receiving web service is deployed. You will see some logs like below.

  • No labels