This documentation is for WSO2 Complex Event Processor 4.0.0. View documentation for the latest release.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to configure WSO2 CEP with Apache Storm in the distributed mode, and run the simple query below in a local Storm cluster. 

@name('query 1') @dist(parallel='4')
from analyticsStats[meta_ipAdd != '192.168.1.1']
select meta_ipAdd, meta_index, meta_timestamp, meta_nanoTime, userID
insert into filteredStatStream;

Above query filters events from analyticsStats stream, and inserts the results into the stream named filteredStatStream.

The @dist(parallel='4') annotation denotes that this query needs to be run in four Storm tasks. You can specify a query group ID to group queries together using the execGroup attribute of the @dist annotation.

Prerequisites

Follow the steps below to set up the prerequisites for this sample.

  1. Set up the prerequisites required for all samples.
  2. Download and deploy Apache Storm. For instructions, see Apache Storm documentation.
  3. Do the following changes in the <CEP_HOME>/repository/conf/axis2/axis2.xml file, to enable Hazelcast clustering in WSO2 CEP. 

    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
            ...    	
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">localhost</parameter>
            ...
            <members>
                <member>
                    <hostName>localhost</hostName>
                    <port>4000</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
  4. Comment the SingleNode and HA processing modes in the <CEP_HOME>/repository/conf/event-processor.xml file as shown below, to configure WSO2 CEP to run with Apache Storm. 

    <eventProcessorConfiguration>
        <mode name="SingleNode"  enable="false">
            ...
        </mode>
    
        <!-- HA Mode Config -->
        <mode name="HA" enable="false">
            ...
        </mode>
    
        <!-- Distributed Mode Config -->
       	<mode name="Distributed" enable="true">
            <nodeType>
                <worker enable="true"/>
                <manager enable="true">
                    <hostName>10.100.7.56</hostName>
                    <port>8904</port>
                </manager>
                <presenter enable="false">
                    <hostName>0.0.0.0</hostName>
                    <port>11000</port>
                </presenter>
            </nodeType>
            <management>
                <managers>
                    <manager>
                        <hostName>10.100.7.56</hostName>
                        <port>8904</port>
                    </manager>
                    <manager>
                        <hostName>10.100.7.56</hostName>
                        <port>8905</port>
                    </manager>
                </managers>
                <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
                <reconnectionInterval>20000</reconnectionInterval>
                <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
                availability for receiving events"-->
                <heartbeatInterval>5000</heartbeatInterval>
                <!--Storm topology re-submit interval in case of a topology submission failure-->
                <topologyResubmitInterval>10000</topologyResubmitInterval>
            </management>
            <transport>
                <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
                <portRange>
                    <min>15000</min>
                    <max>15100</max>
                </portRange>
                <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
                and "Storm Publisher" to "CEP Publisher" connections-->
                <reconnectionInterval>20000</reconnectionInterval>
                <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
                This must be a power of two-->
                <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
                <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
                This must be a power of two-->
                <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
                <!--Size of TCP event publishing client's send buffer in bytes-->
                <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
                <!--Character encoding of TCP event publishing client-->
                <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
                <!--Size of the event queue in each storm spout which stores events to be processed by storm bolts -->
                <stormSpoutBufferSize>10000</stormSpoutBufferSize>
                <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
            </transport>
            <presentation>
                <presentationOutputQueueSize>1024</presentationOutputQueueSize>
                <!--Size of TCP event publishing client's send buffer in bytes-->
                <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
                <!--Character encoding of TCP event publishing client-->
                <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
                <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
            </presentation>
            <statusMonitor>
                <lockTimeout>60000</lockTimeout>
                <updateRate>60000</updateRate>
            </statusMonitor>
            <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
            <distributedUIUrl></distributedUIUrl>
            <memberUpdateCheckInterval>20000</memberUpdateCheckInterval>
        </mode>
    </eventProcessorConfiguration>
  5. Integrate WSO2 CEP with Apache Storm. For instructions, see the distributed mode deployment in Clustered Deployment.

    If you are using a clustered deployment, add configurations on Nimbus, Zookeeper, etc. in the <CEP_HOME>/repository/conf/cep/storm/storm.yaml file on all nodes, for WSO2 CEP to communicate with Apache Storm. However, these configurations are not necessary for this sample, if you run Apache Storm locally.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0501. For instructions, see  Starting sample CEP configurations .

This sample configuration creates the following.

  • Two streams with the iDs analytics_Statistics:1.3.0 and filteredStatStream:1.0.0

  • An event receiver named WSO2EventReceiver

  • An event publisher named WSO2EventPublisher

  • An execution plan named PreprocessStats

Executing the sample

Follow the steps below to execute the sample.

  1. Navigate to the <CEP_HOME>/samples/consumers/wso2-event/ directory, and execute the following Ant command using another tab in the CLI: ant -Dsn=0501

    The other optional parameters that can be used in the above command are defined in the <CEP_HOME>/samples/consumers/wso2-event/build.xml file.

    This builds the sample wso2event consumer and executes it. 

    Do not close this terminal. It is required to keep the server running and receiving events.

  2. Navigate to the <CEP_HOME>/samples/producers/wso2-event/ directory, and execute the following Ant command using another tab in the CLI: ant -DstreamId=analytics_Statistics:1.3.0 -Dsn=0501

    The other optional parameters that can be used in the above command are defined in the <CEP_HOME>/samples/producers/wso2-event/build.xml file.

    This builds and runs the wso2event producer, which will send analytics statistics data to the CEP server. You view the details of the events that are sent as shown below.

    sending events to the WSO2 CEP serverYou view the output events received by the consumer from WSO2 CEP via the terminal opened in step 2 above as shown below.

    output of the consumer

  • No labels