WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to configure WSO2 CEP with Apache Storm in the distributed mode, and run the simple query below in a local Storm cluster. 

@name('query 1') @dist(parallel='4')
from analyticsStats[meta_ipAdd != '192.168.1.1']
select meta_ipAdd, meta_index, meta_timestamp, meta_nanoTime, userID
insert into filteredStatStream;

Above query filters events from analyticsStats stream, and inserts the results into the stream named filteredStatStream.

The @dist(parallel='4') annotation denotes that this query needs to be run in four Storm tasks. You can specify a query group ID to group queries together using the execGroup attribute of the @dist annotation.


Prerequisites

Follow the steps below to set up the prerequisites for this sample.

  1. Set up the prerequisites required for all samples.
  2. Download Apache Storm and set up a Storm cluster. For instructions, refer to documentation on setting up a Storm cluster.
  3. Do the following changes in the <CEP_HOME>/repository/conf/axis2/axis2.xml file, to enable Hazelcast clustering in WSO2 CEP. 

    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
            ...     
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">127.0.0.1</parameter>
            ...
            <members>
                <member>
                    <hostName>127.0.0.1</hostName>
                    <port>4000</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
  4. Do the following changes in the <CEP_HOME>/repository/conf/event-processor.xml file to disable the HA processing mode, enable Distributed processing mode, and configure WSO2 CEP to run with Apache Storm. 

    <eventProcessorConfiguration>
     
        <!-- HA Mode Config -->
        <mode name="HA" enable="false">
            ...
        </mode>
     
        <!-- Distributed Mode Config -->
        <mode name="Distributed" enable="true">
            <nodeType>
                <worker enable="true"/>
                <manager enable="true">
                    <!-- The host name or IP address of this member -->
                    <hostName>127.0.0.1</hostName>
                    <port>8904</port>
                </manager>
                <presenter enable="true">
                    <!-- The host name or IP address of this member -->
                    <hostName>127.0.0.1</hostName>
                    <port>11000</port>
                </presenter>
            </nodeType>
            <management>
                <managers>
                    <manager>
                        <hostName>127.0.0.1</hostName>
                        <port>8904</port>
                    </manager>
                </managers>
                ...
            </management>
            ...
        </mode>
     
    </eventProcessorConfiguration>
  5. Integrate WSO2 CEP with Apache Storm. For instructions, see the distributed mode deployment in Clustered Deployment.

    If you are using a clustered deployment, add configurations on Nimbus, Zookeeper, etc. in the <CEP_HOME>/repository/conf/cep/storm/storm.yaml file on all nodes, for WSO2 CEP to communicate with Apache Storm. However, these configurations are not necessary for this sample, if you run Apache Storm locally.

If you are executing this sample in WSO2 DAS, you need to place a built Storm jar in the <DAS_HOME>/repository/conf/cep/storm directory. This jar can be created by building the pom file in this location.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0501. For instructions, see Starting sample CEP configurations.

This sample configuration creates the following.

  • Two streams with the iDs analytics_Statistics:1.3.0 and filteredStatStream:1.0.0

  • An event receiver named WSO2EventReceiver

  • An event publisher named WSO2EventPublisher

  • An execution plan named PreprocessStats

Executing the sample

Follow the steps below to execute the sample.

  1. Navigate to the <CEP_HOME>/samples/cep/consumers/wso2-event directory, and execute the following Ant command using another tab in the CLI: ant -Dsn=0501

    The other optional parameters that can be used in the above command are defined in the <CEP_HOME>/samples/cep/consumers/wso2-event/build.xml file.

    This builds the sample wso2event consumer and executes it. 

    Do not close this terminal. It is required to keep the server running and receiving events.

  2. Navigate to the <CEP_HOME>/samples/cep/producers/wso2-event/ directory, and execute the following Ant command using another tab in the CLI: ant -DstreamId=analytics_Statistics:1.3.0 -Dsn=0501

    The other optional parameters that can be used in the above command are defined in the <CEP_HOME>/samples/cep/producers/wso2-event/build.xml file.

    This builds and runs the wso2event producer, which will send analytics statistics data to the CEP server. You view the details of the events that are sent as shown below.

    sending events to the WSO2 CEP serverYou view the output events received by the consumer from WSO2 CEP via the terminal opened in step 2 above as shown below.

    output of the consumer

  • No labels