Introduction
This sample demonstrates how to configure WSO2 CEP with Apache Storm in the distributed mode, and run the simple query below in a local Storm cluster.
@name('query 1') @dist(parallel='4') from analyticsStats[meta_ipAdd != '192.168.1.1'] select meta_ipAdd, meta_index, meta_timestamp, meta_nanoTime, userID insert into filteredStatStream;
Above query filters events from analyticsStats
stream, and inserts the results into the stream named filteredStatStream
.
@dist(parallel='4')
annotation denotes that this query needs to be run in four Storm tasks. You can specify a query group ID to group queries together using the execGroup
attribute of the @dist
annotation.Prerequisites
Follow the steps below to set up the prerequisites for this sample.
- Set up the prerequisites required for all samples.
- Download and deploy Apache Storm. For instructions, see Apache Storm documentation.
Do the following changes in the
<CEP_HOME>/repository/conf/axis2/axis2.xml
file, to enable Hazelcast clustering in WSO2 CEP.<axisconfig name="AxisJava2.0"> ... <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true"> ... <parameter name="membershipScheme">wka</parameter> ... <!-- The host name or IP address of this member other then localhost/127.0.0.1 --> <parameter name="localMemberHost">localhost</parameter> ... <members> <member> <hostName>localhost</hostName> <port>4000</port> </member> </members> ... </clustering> </axisconfig>
Comment the
SingleNode
andHA
processing modes in the<CEP_HOME>/repository/conf/event-processor.xml
file as shown below, to configure WSO2 CEP to run with Apache Storm.<eventProcessorConfiguration> <mode name="SingleNode" enable="false"> ... </mode> <!-- HA Mode Config --> <mode name="HA" enable="false"> ... </mode> <!-- Distributed Mode Config --> <mode name="Distributed" enable="true"> <nodeType> <worker enable="true"/> <manager enable="true"> <hostName>10.100.7.56</hostName> <port>8904</port> </manager> <presenter enable="false"> <hostName>0.0.0.0</hostName> <port>11000</port> </presenter> </nodeType> <management> <managers> <manager> <hostName>10.100.7.56</hostName> <port>8904</port> </manager> <manager> <hostName>10.100.7.56</hostName> <port>8905</port> </manager> </managers> <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure--> <reconnectionInterval>20000</reconnectionInterval> <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their availability for receiving events"--> <heartbeatInterval>5000</heartbeatInterval> <!--Storm topology re-submit interval in case of a topology submission failure--> <topologyResubmitInterval>10000</topologyResubmitInterval> </management> <transport> <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"--> <portRange> <min>15000</min> <max>15100</max> </portRange> <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections and "Storm Publisher" to "CEP Publisher" connections--> <reconnectionInterval>20000</reconnectionInterval> <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" . This must be a power of two--> <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize> <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" . This must be a power of two--> <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize> <!--Size of TCP event publishing client's send buffer in bytes--> <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize> <!--Character encoding of TCP event publishing client--> <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet> <!--Size of the event queue in each storm spout which stores events to be processed by storm bolts --> <stormSpoutBufferSize>10000</stormSpoutBufferSize> <connectionStatusCheckInterval>20000</connectionStatusCheckInterval> </transport> <presentation> <presentationOutputQueueSize>1024</presentationOutputQueueSize> <!--Size of TCP event publishing client's send buffer in bytes--> <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize> <!--Character encoding of TCP event publishing client--> <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet> <connectionStatusCheckInterval>20000</connectionStatusCheckInterval> </presentation> <statusMonitor> <lockTimeout>60000</lockTimeout> <updateRate>60000</updateRate> </statusMonitor> <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar> <distributedUIUrl></distributedUIUrl> <memberUpdateCheckInterval>20000</memberUpdateCheckInterval> </mode> </eventProcessorConfiguration>
Integrate WSO2 CEP with Apache Storm. For instructions, see the distributed mode deployment in Clustered Deployment.
If you are using a clustered deployment, add configurations on Nimbus, Zookeeper, etc. in the
<CEP_HOME>/repository/conf/cep/storm/storm.yaml
file on all nodes, for WSO2 CEP to communicate with Apache Storm. However, these configurations are not necessary for this sample, if you run Apache Storm locally.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0501. For instructions, see Starting sample CEP configurations .
This sample configuration creates the following.
Two streams with the iDs
analytics_Statistics:1.3.0
andfilteredStatStream:1.0.0
An event receiver named
WSO2EventReceiver
An event publisher named
WSO2EventPublisher
An execution plan named
PreprocessStats
Executing the sample
Follow the steps below to execute the sample.
Navigate to the
<CEP_HOME>/samples/consumers/wso2-event/
directory, and execute the following Ant command using another tab in the CLI:ant -Dsn=0501
The other optional parameters that can be used in the above command are defined in the
<CEP_HOME>/samples/consumers/wso2-event/build.xml
file.This builds the sample
wso2event
consumer and executes it.Do not close this terminal. It is required to keep the server running and receiving events.
Navigate to the
<CEP_HOME>/samples/producers/wso2-event/
directory, and execute the following Ant command using another tab in the CLI:ant -DstreamId=analytics_Statistics:1.3.0 -Dsn=0501
The other optional parameters that can be used in the above command are defined in the
<CEP_HOME>/samples/producers/wso2-event/build.xml
file.This builds and runs the
wso2event
producer, which will send analytics statistics data to the CEP server. You view the details of the events that are sent as shown below.You view the output events received by the consumer from WSO2 CEP via the terminal opened in step 2 above as shown below.