WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to configure WSO2 CEP with Apache Storm in the distributed mode, and run the sample query below in a local/distributed Storm cluster. 

@name('query1')
@dist(parallel='4', execGroup='1')
from analyticsStats
select meta_timestamp,  str:contains(userID, 'wso2') as isValidUserID, userID
insert into filteredAnalyticsStats;

@name('query2')
@dist(parallel='4', execGroup='1')
from filteredAnalyticsStats[isValidUserID == true]
select * insert into validAnalyticsStat;

@name('query3')
@dist(parallel='2', execGroup='2')
partition with (userID of validAnalyticsStat)
begin
    from validAnalyticsStat#window.lengthBatch(3)
    select userID, max(meta_timestamp) as latestTimestamp, min(meta_timestamp) as earliestTimestamp
    insert into processedAnalyticsStats;
end ;

@name('query4')
@dist(parallel='3', execGroup='3')
from processedAnalyticsStats
select userID , (latestTimestamp - earliestTimestamp) as difference
insert into processedStream;

Above query filters events from analyticsStats stream, and inserts the results into the stream named processedStream.

The @dist(parallel='4') annotation denotes that this query needs to be run in four Storm tasks. You can specify a query group ID to group queries together using the execGroup attribute of the @dist annotation.

Prerequisites

Follow the steps below to set up the prerequisites for this sample.

  1. Set up the prerequisites required for all samples.
  2. Download Apache Storm and set up a Storm cluster. For instructions, refer to documentation on setting up a Storm cluster.
  3. Do the following changes in the <CEP_HOME>/repository/conf/axis2/axis2.xml file, to enable Hazelcast clustering in WSO2 CEP. 

    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
            ...    	
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">127.0.0.1</parameter>
            ...
            <members>
                <member>
                    <hostName>127.0.0.1</hostName>
                    <port>4000</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
  4. Do the following changes in the <CEP_HOME>/repository/conf/event-processor.xml file to disable the HA processing mode, enable Distributed processing mode, and configure WSO2 CEP to run with Apache Storm. 

    <eventProcessorConfiguration>
    
        <!-- HA Mode Config -->
        <mode name="HA" enable="false">
            ...
        </mode>
    
        <!-- Distributed Mode Config -->
        <mode name="Distributed" enable="true">
            <nodeType>
                <worker enable="true"/>
                <manager enable="true">
                    <!-- The host name or IP address of this member -->
                    <hostName>127.0.0.1</hostName>
                    <port>8904</port>
                </manager>
                <presenter enable="true">
                    <!-- The host name or IP address of this member -->
                    <hostName>127.0.0.1</hostName>
                    <port>11000</port>
                </presenter>
            </nodeType>
            <management>
                <managers>
                    <manager>
                        <hostName>127.0.0.1</hostName>
                        <port>8904</port>
                    </manager>
                </managers>
                ...
            </management>
            ...
        </mode>
    
    </eventProcessorConfiguration>
  5. Integrate WSO2 CEP with Apache Storm. For instructions, see the distributed mode deployment in Clustered Deployment.

    If you are using a clustered deployment, add configurations on Nimbus, Zookeeper, etc. in the <CEP_HOME>/repository/conf/cep/storm/storm.yaml file on all nodes, for WSO2 CEP to communicate with Apache Storm. However, these configurations are not necessary for this sample, if you run Apache Storm locally.

If you are executing this sample in WSO2 DAS, you need to place a built Storm jar in the <DAS_HOME>/repository/conf/cep/storm directory. This jar can be created by building the pom file in this location.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0504. For instructions, see Starting sample CEP configurations.

This sample configuration creates the following.

  • Two streams with the iDs analytics_Statistics:1.3.0 and processedStream:1.0.0

  • An event receiver named WSO2EventReceiver

  • An event publisher named LoggerPublisher

  • An execution plan named StatExecutionPlan

Executing the sample

Follow the steps below to execute the sample.

  1. Navigate to the <CEP_HOME>/samples/cep/producers/wso2-event/ directory, and execute the following Ant command using another tab in the CLI: ant -DstreamId=analytics_Statistics:1.3.0 -Dsn=0504

    The other optional parameters that can be used in the above command are defined in the <CEP_HOME>/cep/samples/producers/wso2-event/build.xml file.

  2. This builds and runs the wso2event producer, which will send analytics statistics data to the CEP server. You view the details of the events that are sent as shown below.


  3. You view the output events received by the consumer from WSO2 CEP via the terminal opened when running CEP in foreground.

  • No labels