This sample demonstrates how Siddhi engine can be used with JMS event broker to receive, process and publish XML messages.
In this sample CEP will receive stock quote information and fire outputs if the last traded amount vary by 2 percent with regards to the average traded price within past 2 minutes.
from allStockQuotesStream#window.time(120000) insert into fastMovingStockQuotesStream symbol,avg(price) as avgPrice, price group by symbol having ((price > (avgPrice*1.02)) or ((avgPrice*0.98)>price ));
Here we will publish events using an JMS client to a JMS topic called AllStockQuotes and fired outputs of the bucket will be send to a JMS topic called FastMovingStockQuotes, which will be received using another JMS client and log in console.
Following is the configuration used in this sample.
The steps are as follows :
In order to run the above sample we have to do two things.
We will use a JMS client for this purpose.
Following class will create Initial Context to run the event publisher client. Note that this class is used in event subscriber JMS client as well.
package org.wso2.cep.sample.jms.andes; import javax.jms.TopicConnectionFactory; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class JNDIContext { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "ConnectionFactory"; private static final String userName = "admin"; private static final String password = "admin"; private static String CARBON_CLIENT_ID = "clientid"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5673"; private InitialContext initContext = null; private TopicConnectionFactory topicConnectionFactory = null; public static JNDIContext instance = null; private JNDIContext() { createInitialContext(); createConnectionFactory(); } public InitialContext getInitContext() { return initContext; } public TopicConnectionFactory getTopicConnectionFactory() { return topicConnectionFactory; } public static JNDIContext getInstance() { if (instance == null) { instance = new JNDIContext(); } return instance; } /** * Create Connection factory with initial context */ private void createConnectionFactory() { try { topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory"); } catch (NamingException e) { System.out.println("Can not create topic connection factory." + e); } } /** * Create Initial Context with given configuration */ private void createInitialContext() { try { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); System.out.println("TCPConnectionURL: = " + getTCPConnectionURL(userName, password)); initContext = new InitialContext(properties); } catch (NamingException e) { System.out.println("Can not create initial context with given parameters." + e); } } public String getTCPConnectionURL(String username, String password) { return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Following class is the event publisher client. By running this class three events are generated and sent to the CEP server (Actually we publish events to the topic at WSO2 MB, which is registered at CEP server).
package org.wso2.cep.sample.jms.andes.xmlMessage; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.impl.builder.StAXOMBuilder; import org.apache.axiom.om.util.StAXUtils; import org.wso2.cep.sample.jms.andes.JNDIContext; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.naming.InitialContext; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import java.io.ByteArrayInputStream; public class AllStockQuotesPublisher { private static InitialContext initContext = null; private static TopicConnectionFactory topicConnectionFactory = null; public static void main(String[] args) throws XMLStreamException { String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>126.36 </quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>36.36</quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>6.36</quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; initContext = JNDIContext.getInstance().getInitContext(); topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory(); AllStockQuotesPublisher publisher = new AllStockQuotesPublisher(); XMLStreamReader reader1 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement1.getBytes())); StAXOMBuilder builder1 = new StAXOMBuilder(reader1); OMElement OMMessage1 = builder1.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage1); XMLStreamReader reader2 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement2.getBytes())); StAXOMBuilder builder2 = new StAXOMBuilder(reader2); OMElement OMMessage2 = builder2.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage2); XMLStreamReader reader3 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement3.getBytes())); StAXOMBuilder builder3 = new StAXOMBuilder(reader3); OMElement OMMessage3 = builder3.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage3); } /** * Publish message to given topic * * @param topicName - topic name to publish messages * @param message - message to send */ public void publish(String topicName, OMElement message) { // create topic connection TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.start(); } catch (JMSException e) { System.out.println("Can not create topic connection." + e); return; } // create session, producer, message and send message to given destination(topic) // OMElement message text is published here. Session session = null; try { session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); MessageProducer producer = session.createProducer(topic); TextMessage jmsMessage = session.createTextMessage(message.toString()); producer.send(jmsMessage); producer.close(); session.close(); topicConnection.stop(); topicConnection.close(); } catch (JMSException e) { System.out.println("Can not subscribe." + e); } } }
Following class acts as a JMS topic subscriber client. We register a subscription for filtered events we get from CEP triggered according to the query at bucket we have defined (Actually we are subscribing for a topic created at WSO2 Message broker, to which CEP will publish filtered events and notifications internally).
package org.wso2.cep.sample.jms.andes.xmlMessage; import org.wso2.cep.sample.jms.andes.JNDIContext; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.xml.stream.XMLStreamException; import java.util.Enumeration; public class FastMovingStockQuotesSubscriber implements MessageListener { private static InitialContext initContext = null; private static TopicConnectionFactory topicConnectionFactory = null; private boolean messageReceived = false; static String TOPIC = "FastMovingStockQuotes"; public static void main(String[] args) throws XMLStreamException { initContext = JNDIContext.getInstance().getInitContext(); topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory(); new FastMovingStockQuotesSubscriber().subscribe(TOPIC); } public void subscribe(String topicName) { // create connection TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); } catch (JMSException e) { System.out.println("Can not create topic connection." + e); return; } // create session, subscriber, message listener and listen on that topic TopicSession session = null; try { session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(this); topicConnection.start(); synchronized (this) { while (!messageReceived) { try { this.wait(); } catch (InterruptedException e) { } } } } catch (JMSException e) { System.out.println("Can not subscribe." + e); } } public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("output = " + textMessage.getText()); synchronized (this) { messageReceived = true; } } catch (JMSException e) { System.out.println("error at getting text out of received message. = " + e); } } else if (message instanceof MapMessage) { try { Enumeration enumeration = ((MapMessage) message).getMapNames(); for (; enumeration.hasMoreElements(); ) { System.out.println(((MapMessage) message).getString((String) enumeration.nextElement())); } } catch (JMSException e) { e.printStackTrace(); } } else { System.out.println("Received message is not a text/map message."); } } }
First we have to deploy a web service at any WSO2 Server which would act as the receiver web service for messages from CEP server. We will use CEP itself to deploy such a web service.
The steps are as follows :
We need to Configure the FastMovingStockQuoteReceiverService in order to receive the output events emitted by the bucket under the FastMovingStockQuotes topic. Here we will be creating FastMovingStockQuotes topic in the WSO2 Message Broker and subscribe FastMovingStockQuoteReceiverService on that topic.
The steps are as follows :
Create subscription with following details. Once you are done click the Subscribe button.
topic : FastMovingStockQuotes (Output topic) subscription mode: Topic only subscription URL : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement expiration Time : select a future date from calender
Once you click on the "Subscribe" button, you will be directed to the Topic Browser page.
You can verify whether you have correctly subscribe to the topic by clicking on "Details" link of that topic in topic browser page.
Once you click on that, you will be directed to the "topic details" page and there you will find all the subscriptions for that topic and its children (in this case it does not exists) and permission on that topic. Apart from that with the publish section, you can publish a test XML message to that topic and check whether your subscription URL has been properly subscribed.
The steps are as follows :
Observe the console where we are running the JMS subscriber client or server console where message receiving web service is deployed. You will see some logs like below.