This documentation is for WSO2 Complex Event Processor 4.0.0. View documentation for the latest release.
WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

To view a screencast of the Quick Start Guide, click here and scroll down.

WSO2 Complex Event Processor monitors the transactions and activities of an enterprise (referred to as events), analyses them and presents the results to a range of interfaces in real time. Not having to store this information enables it to process and present information with greater speed.

The following sections walk you through the basic features of the CEP to get you started.

Getting Started Video

You can also find the content here as the Getting Started Video

Before you begin

  1. Install Oracle Java SE Development Kit (JDK) version 1.7* or 1.8 and set the JAVA_HOME environment variable.
  2. Download WSO2 CEP.
  3. Start the CEP by going to <CEP_HOME>/bin using the command-line and executing wso2server.bat  (for Windows) or  wso2server.sh  (for Linux.) 

Creating a simple event flow

An event flow refers to a specific combination of event streams, event receivers, event publishers and execution plans. The following steps describe how to define these elements of an event flow.

Step 1: Add an event stream

Event stream defines the events which goes through a particular flow by defining the event's attributes and it's types. An event stream can be created in the CEP Management Console as follows.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page. 
  2. Click Add Event Stream to open the Define New Event Stream page. 
  3. Enter information as follows to create a new event stream named org.wso2.event.sensor.stream.

    Event Stream Details

    Parameter NameValue
    Event Stream Nameorg.wso2.event.sensor.stream
    Event Stream Version1.0.0

    Stream Attributes

    Click Add to add the attribute after entering the attribute name and attribute type.

    Attribute CategoryAttributeAttribute Type
    Meta Datatimestamplong
     isPowerSaverEnabledbool
     sensorIdint
     sensorNamestring
    Correlation Datalongitudedouble
     latitudedouble
    Payload Datahumidityfloat
     sensorValuedouble
  4. Click Add Event Stream to save the information.

Step 2: Add an event receiver

Events received by the CEP server have different formats such as XML, JSON and Map. Event receivers transform these events to WSO2 Events that can be directed to an event stream defined in the CEP.

In this step, you will create an event receiver named httpReceiver which directs events to the event stream named  org.wso2.event.sensor.stream that was created in Step 1. The receiver can be created using the Management Console as follows.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Receivers to open the Available Receivers page.
  2. Click Add Event Receiver to open the Create a New Event Receiver page.
  3. Enter information as follows to create the new event receiver named httpReceiver.

    Parameter NameValue
    Event Receiver NamehttpReceiver
    Input Event Adapter Typehttp
    Transportsall
    Event Streamorg.wso2.event.sensor.stream
    Message Formatjson
  4. Click Add Event Receiver to save the information.

Step 3: Add an Event Publisher

Event publishers publish events processed by the WSO2 servers to external applications. These events are published via HTTP, Kafka, JMS, etc. in  JSON, XML, Map, text, and WSO2Event formats  to various endpoints and data stores.

In this step, you will create an event publisher named loggerPublisher to publish events from the event stream named  org.wso2.event.sensor.stream that was created in Step 1. Since the output event adapter type of this publisher is logger, the events published will be logged in the CEP CLI. The publisher can be created using the Management Console as follows.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
  2. Click Add Event Publisher to open the Create a New Event Publisher page.
  3. Enter information as follows to create the new event publisher named  loggerPublisher.

    Parameter NameDescription
    Event Publisher NameloggerPublisher
    Event Sourceorg.wso2.event.sensor.stream
    Output Event Adapter Typelogger
    Message Formattext


  4. Click Add Event Publisher to save the information.

Step 4: View the simple event flow

This step involves viewing the event flow you created and understanding how the different elements in it are connected. The event flow can be viewed as follows.

  1. Log into the CEP Management Console if you are not already logged in. 
  2. Click the Main tab and then click Flow to open the CEP Event Flow page. The event flow you created is displayed as follows.
     
    This diagram indicates that httpReceiver forwards events to the org.wso2.event.sensor.stream.1.0.0 stream. These events are then published by the loggerPublisher.

Receive and log events

Step 5: Receive events via HTTP transport

Navigate to <CEP_HOME>/samples/producers/http and run the following command which sends events to the CEP via the HTTP transport.

ant -Durl=http://localhost:9763/endpoints/httpReceiver -Dsn=0001

This builds the HTTP client and sends the events in the <CEP_HOME>/samples/artifacts/0001/httpReceiver.txt file to the httpReceiver endpoint. You can view the details of the events that are sent as shown in the log below. These logs are published by the publisher created in Step 3.

The logs of the JSON events received by the CEP server will be displayed in the CLI as shown in the example below.

Processing events with an execution plan

Step 6: Add another event stream

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page. 
  2. Click Add Event Stream to open the Define New Event Stream page. 
  3. Enter information as follows to create the event stream named org.wso2.event.sensor.filtered.stream.

    Event Stream Details

    Paramater NameValue
    Event Stream Nameorg.wso2.event.sensor.filtered.stream
    Event Stream Version1.0.0

    Stream Attributes

    Attribute CategoryAttributeAttribute Type
    Meta Datetimestamplong
     sensorNamestring
    Correlation Datalongitudedouble
     latitudedouble
    Payload DatasensorValuedouble
  4. Click Add Event Stream to save the information.

Step 7: Add an execution plan 

An Execution Plan can import one or more streams from the server for processing and push zero or more output streams back to the server. Refer Processing Events for further explanations.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Execution Plans to open the Available Execution Plans page. 
  2. Click Add Execution Plan to open the Create a New Execution Plan page.
  3. Enter information as follows to create the new execution plan

    Parameter NameValue
    Import Streamorg.wso2.event.sensor.stream:1.0.0
    AssensorStream
    Value OffilteredStream
    StreamIdorg.wso2.event.sensor.filtered.stream:1.0.0
  4. Click Import and then click Export. The section for query expressions will be updated as shown below.
  5. Add the following query expression.

    from sensorStream [sensorValue > 100]
    select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue
    insert into filteredStream	

    This query includes the value sensorValue > 100. Therefore, when the execution plan forwards events from org.wso2.event.sensor.stream:1.0.0  to org.wso2.event.sensor.filtered.stream:1.0.0, events in which the value for the sensorValue attribute is less than 100 will be dropped.

  6. Click Validate  Query Expressions. Once you get a message to confirm that the queries are valid, click Add Execution Plan.

Publish events in dashboard

Step 8: Add a UI event publisher

In this step, you will add another publisher named uiPublisher to publish events from the stream named org.wso2.event.sensor.filtered stream to the Analytics Dashboard.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
  2. Click Add Event Publisher to open the Create a New Event Publisher page.
  3. Enter information as follows to create the new event publisher named UIPublisher.

    Parameter NameDescription
    Event Publisher NameuiPublisher
    Event Sourceorg.wso2.event.sensor.filtered.stream:1.0.0
    Output Event Adapter Typeui
    Message Formatwso2event
  4. Click Add Event Publisher to save the information.

Step 9: Create a dashboard and a gadget

WSO2 Analytics Dashboard will be used as the tool to analyse the output of the event flow you created in this guide. This step creates a dashboard and a gadget which analyses events from the  org.wso2.event.sensor.filtered.stream stream published by the uiPublisher publisher.

  1. Log into the CEP Management Console. In the Main tab, click Analytics Dashboard.
  2. Log into the Analytics Dashboard with your username and password.
  3. Click CREATE GADGET to open the CREATE A GADGET page.
  4. Select  org.wso2.event.sensor.filtered.stream for the Analytics Table/Event Stream parameter. Then click Next.
  5. Select Line for the Chart Type parameter. Then enter values as follows in the rest of the parameters relating to the chart.

    Parameter NameValue
    TitleSensor Value VS Timestamp
    X-AxisTIMESTAMP
    Y-AxissensorValue
    Interpolation MethodMonotone
  6. Click Add to Gadget Store  to save the gadget in the gadget store. You will return to the DASHBOARDS page.
  7. Click CREATE DASHBOARD to open the CREATE A DASHBOARD page.
  8. Enter information as follows for the new dashboard and click Next.

    Parameter NameValue
    Name of your DashboardSensor Statistics
    DescriptionThis dashboard indicates the sensor value at different times in a particular location.
  9. Select the Single Column layout as the layout in the next page. The layout will open.
  10. Click the following icon to open the available gadgets.
     
  11. Drag and drop the Sensor Value VS Timestamp gadget to a grid in the layout as shown below.
     

Step 10: Send Events to the HTTP Receiver via Curl Command

This step sends events to the receiver named httpReceiver using a curl command. These events are processed by the event flow you created, and published in the CEP CLI by the publisher created in Step 3.

  1. Issue the following curl command.

    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468145264 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 96.5 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"

    The following log will appear in the CEP CLI.

    Note that the Sensor Statistics dashboard you created does not get updated. This is because the value for the sensorValue attribute is less than 100 in this event. Therefore, it gets dropped by the filter you created in the execution plan, and as a result, it is not forwarded to the  org.wso2.event.sensor.filtered. stream:1.0.0 stream.

  2. Issue another command with a value greater than 100 for the sensorValue attribute as follows.

    curl -X POST -d "{ "event": { "metaData": { "timestamp":1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
    
    

     The following log will appear in he CEP CLI.

    The event is forwarded to the  org.wso2.event.sensor.filtered. stream:1.0.0 stream since the value for the sensorValue attribute is greater than 100. Therefore, the Sensor Statistics dashboard will be updated as shown below.
     

  3. Issue more curl commands as follows with several timestamps and sensor values. 

    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467890957 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 170 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467951518 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 131 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467992936 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 126 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
    curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468050928 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 145 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"

    Since the value for the sensorValue attribute is greater than 100 in all these events, the dashboard will be updated as shown below. These events will also be logged in the CEP CLI.

    Send sensor events with several timestamps and sensorValues, The dashboard will now get effected with the sent event since sensorValue is over 100 and the event gets sent to the dashboard. These events will get logged in the CEP CLI as well.

Deploying execution plans using templates

Step 11: Create and deploy a template

The following is the configuration of the execution plan you created in this guide as a template. This configuration should be saved in a XML file in the <CEP_HOME>/repository/conf/cep/domain-template directory.

$filteringVal  can be configured to the expected filtering value from the Execution manager.

<templateDomain name="SensorStatistics">
    <description>Sensor Statistics Analysis</description>
    <templates>
        <template name="Filtering template">
            <description>To filter the sensor events with a given sensor value</description>
            <executionPlan>
                <![CDATA[
                /* Enter a unique ExecutionPlan */
                @Plan:name('ExecutionPlan')
                /* Enter a unique description for ExecutionPlan */
                -- @Plan:description('ExecutionPlan')
                /* define streams/tables and write queries here ... */
                @Import('org.wso2.event.sensor.stream:1.0.0')
                define stream sensorStream (meta_timestamp long, meta_isPowerSaverEnabled bool, 
					meta_sensorId int, meta_sensorName string, correlation_longitude double, correlation_latitude double, 
					humidity float, sensorValue double);
                @Export('org.wso2.event.sensor.filtered.stream:1.0.0')
                define stream filteredStream (meta_timestamp long, meta_sensorName string, correlation_longitude double, 
					correlation_latitude double, sensorValue double);
                from sensorStream [sensorValue > $filteringVal]
                select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue
                insert into filteredStream
                 ]]>
            </executionPlan>
            <parameters>
                <parameter name="filteringVal" type="double">
                    <displayName>Filtering Value</displayName>
                    <description>Only the sensor values below filtering value will be dropped</description>
                    <defaultValue>100</defaultValue>
                </parameter>
            </parameters>
        </template>
    </templates>
    <streams>
        <stream>
        {
          "name": "org.wso2.event.sensor.filtered.stream",
          "version": "1.0.0",
          "nickName": "",
          "description": "",
          "metaData": [
            {
              "name": "timestamp",
              "type": "LONG"
            },
            {
              "name": "sensorName",
              "type": "STRING"
            }
          ],
          "correlationData": [
            {
              "name": "longitude",
              "type": "DOUBLE"
            },
            {
              "name": "latitude",
              "type": "DOUBLE"
            }
          ],
          "payloadData": [
            {
              "name": "sensorValue",
              "type": "DOUBLE"
            }
          ]
        }
        </stream>
        <stream>
        {
          "name": "org.wso2.event.sensor.stream",
          "version": "1.0.0",
          "nickName": "",
          "description": "",
          "metaData": [
            {
              "name": "timestamp",
              "type": "LONG"
            },
            {
              "name": "isPowerSaverEnabled",
              "type": "BOOL"
            },
            {
              "name": "sensorId",
              "type": "INT"
            },
            {
              "name": "sensorName",
              "type": "STRING"
            }
          ],
          "correlationData": [
            {
              "name": "longitude",
              "type": "DOUBLE"
            },
            {
              "name": "latitude",
              "type": "DOUBLE"
            }
          ],
          "payloadData": [
            {
              "name": "humidity",
              "type": "FLOAT"
            },
            {
              "name": "sensorValue",
              "type": "DOUBLE"
            }
          ]
        }
        </stream>
    </streams>
</templateDomain>

Step 12: Configure a template

Before you carry out this step, delete the event stream you created under Step1: Add an event stream, and Step 6: Add another event stream.

  1. If the CEP server was running when you created and deployed the template, restart the CEP server.
  2. Log into the CEP Management Console. Click the Main tab and then click Execution Manager. The dashboard home page with the available domains will be displayed as follows.

  3. Click on SensorStatistics to open the Configurations page. Then click Add Configuration.
     
  4. Enter information as follows and then click Add Configuration.

    Parameter NameValue
    Configuration NameFilterSensorValues
    DescriptionFilter events which
    Filtering Value125.6

    You will receive a message to confirm that the configurations were successfully saved. Close the message. The configuration added will be displayed in the Configurations page as follows.

Step 13: View the elements of the event flow

Log into the CEP Management Console and click the Main tab. Then click Flow   to open the CEP Event Flow page. The complete event flow you created in this guide will be displayed as follows.

The following is a summary of this guide which describes each element in the event flow.

ElementTypeRole
httpReceiverEvent ReceiverReceives CEP events in multiple formats and converts them all into the WSO2 Event format before forwarding them to the org.wso2.event.sensor.stream:1.0.0 event stream.
org.wso2.event.sensor.stream:1.0.0:Event StreamDefines the attributes on which selection of events to be processed by the event flow is based.
loggerPublisherEvent PublisherLogs events from the org.wso2.event.sensor.stream:1.0.0 event stream in the CEP CLI.
ExecutionPlanExecution PlanApplies a filter criteria to events in the org.wso2.event.sensor.stream:1.0.0 event stream and forwards the filtered events to org.wso2.event.sensor.filtered.stream:1.0.0 event stream.
SensorStatistics-FilterSensorValuesExecution PlanThis is an execution plan created from a template.
org.wso2.event.sensor.filtered.stream:1.0.0 Event StreamImports attributes from the org.wso2.event.sensor.stream:1.0.0 event stream and receives events filtered from that event stream by the execution plan.
uiPublisherEvent PublisherPublishes events from the org.wso2.event.sensor.filtered.stream:1.0.0 event stream in the Analytics Dashboard.
  • No labels