Introduction
This sample demonstrates how to configure WSO2 CEP with Apache Storm in the distributed mode, and run the sample query below in a local/distributed Storm cluster.
@name('query1') @dist(parallel='4', execGroup='1') from analyticsStats select meta_timestamp, str:contains(userID, 'wso2') as isValidUserID, userID insert into filteredAnalyticsStats; @name('query2') @dist(parallel='4', execGroup='1') from filteredAnalyticsStats[isValidUserID == true] select * insert into validAnalyticsStat; @name('query3') @dist(parallel='2', execGroup='2') partition with (userID of validAnalyticsStat) begin from validAnalyticsStat#window.lengthBatch(3) select userID, max(meta_timestamp) as latestTimestamp, min(meta_timestamp) as earliestTimestamp insert into processedAnalyticsStats; end ; @name('query4') @dist(parallel='3', execGroup='3') from processedAnalyticsStats select userID , (latestTimestamp - earliestTimestamp) as difference insert into processedStream;
Above query filters events from analyticsStats
stream, and inserts the results into the stream named processedStream
.
@dist(parallel='4')
annotation denotes that this query needs to be run in four Storm tasks. You can specify a query group ID to group queries together using the execGroup
attribute of the @dist
annotation.Prerequisites
Follow the steps below to set up the prerequisites for this sample.
- Set up the prerequisites required for all samples.
- Download Apache Storm and set up a Storm cluster. For instructions, refer to documentation on setting up a Storm cluster.
Do the following changes in the
<CEP_HOME>/repository/conf/axis2/axis2.xml
file, to enable Hazelcast clustering in WSO2 CEP.<axisconfig name="AxisJava2.0"> ... <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true"> ... <parameter name="membershipScheme">wka</parameter> ... <!-- The host name or IP address of this member other then localhost/127.0.0.1 --> <parameter name="localMemberHost">127.0.0.1</parameter> ... <members> <member> <hostName>127.0.0.1</hostName> <port>4000</port> </member> </members> ... </clustering> </axisconfig>
Do the following changes in the
<CEP_HOME>/repository/conf/event-processor.xml
file to disable theHA
processing mode, enable Distributed processing mode, and configure WSO2 CEP to run with Apache Storm.<eventProcessorConfiguration> <!-- HA Mode Config --> <mode name="HA" enable="false"> ... </mode> <!-- Distributed Mode Config --> <mode name="Distributed" enable="true"> <nodeType> <worker enable="true"/> <manager enable="true"> <!-- The host name or IP address of this member --> <hostName>127.0.0.1</hostName> <port>8904</port> </manager> <presenter enable="true"> <!-- The host name or IP address of this member --> <hostName>127.0.0.1</hostName> <port>11000</port> </presenter> </nodeType> <management> <managers> <manager> <hostName>127.0.0.1</hostName> <port>8904</port> </manager> </managers> ... </management> ... </mode> </eventProcessorConfiguration>
Integrate WSO2 CEP with Apache Storm. For instructions, see the distributed mode deployment in Clustered Deployment.
If you are using a clustered deployment, add configurations on Nimbus, Zookeeper, etc. in the
<CEP_HOME>/repository/conf/cep/storm/storm.yaml
file on all nodes, for WSO2 CEP to communicate with Apache Storm. However, these configurations are not necessary for this sample, if you run Apache Storm locally.
DAS_HOME>/repository/conf/cep/storm
directory. This jar can be created by building the pom file in this location.Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0504. For instructions, see Starting sample CEP configurations.
This sample configuration creates the following.
Two streams with the iDs
analytics_Statistics:1.3.0
andprocessedStream:1.0.0
An event receiver named
WSO2EventReceiver
An event publisher named
LoggerPublisher
An execution plan named
StatExecutionPlan
Executing the sample
Follow the steps below to execute the sample.
Navigate to the
<CEP_HOME>/samples/cep/producers/wso2-event/
directory, and execute the following Ant command using another tab in the CLI:ant -DstreamId=analytics_Statistics:1.3.0 -Dsn=0504
The other optional parameters that can be used in the above command are defined in the
<CEP_HOME>/cep/samples/producers/wso2-event/build.xml
file.This builds and runs the
wso2event
producer, which will send analytics statistics data to the CEP server. You view the details of the events that are sent as shown below.You view the output events received by the consumer from WSO2 CEP via the terminal opened when running CEP in foreground.