This documentation is for WSO2 Complex Event Processor 4.0.0. View documentation for the latest release.
WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to set up an execution plan to perform calculations over time by aggregating events. The queries use time windows and time batch windows to aggregate event over time. This sample uses the Event Simulator for inputs and the logger publisher for logging the custom output events to the CEP console. Custom events are events with custom mappings that does not adhere to the default event formats. For more information on event formats, see Event Formats

The queries used in the WindowBasedAvgTemp execution plan used in this sample are as follows:

-- with time sliding window of 1 mim
from TempStream#window.time(1 min)
select roomNo, avg(temp) as avgTemp
group by roomNo
insert all events into AvgRoomTempStream ;

-- with time batch (tumbling) window of 1 min
from TempStream#window.timeBatch(1 min)
select roomNo, avg(temp) as avgTemp
group by roomNo
insert all events into AvgRoomTempPerMinStream ;

The first query,

  • Processes the events received through the TempStream
  • Defines a sliding time window of 1 minute, that keeps each arriving event for 1 minute
  • Selects the attributes roomNo, avg(temp) from the events stored in the time window. Due to the group by clause used here, the average is calculated per roomNo here. The average of the temp values is named as avgTemp here. 
  • The all events clause in the insert statement makes the query to be triggered by both current events and expired events (a currrent event is the incoming events to the window. an expired event is an event emitted by the window after keeping it for 1 minute in this case)
  • Emits those events as output events through the AvgRoomTempStream stream.
  • Mathematically, this query calculates the moving average of the room temperature for each room and gives instantaneous results upon arrival/expiration of each incoming event.

The second query,

  • Processes the events received through the TempStream
  • Defines a time batch window of 1 minute, that keeps all incoming events and emits events periodically every 1 minute.
  • Selects the attributes roomNo, avg(temp) from the events stored in the time window. Due to the group by clause used here, the average is calculated per roomNo here. The average of the temp values is named as avgTemp here. 
  • The all events clause in the insert statement makes the query to be triggered by both current events and expired events (a currrent event is the incoming events to the window. an expired event is an event emitted by the window after keeping it for 1 minute in this case)
  • Emits those events as output events through the AvgRoomTempPerMinStream stream.
  • Similar to the first query, this also calculates a moving average of the temperature for each room, but emits them periodically every 1 minute.

 

There's another execution plan that continuously calculate the average temperature from the beginning which is as follows:

from TempStream
select roomNo, avg(temp) as avgTemp
insert into AvgTempFromStartStream ;

The third query,

  • Processes the events received through the TempStream.
  • When selecting the attributes, concatenate roomNo and avg(temp) renamed as avgTemp which is the average of the temperature for each room from the start. 
  • Emits those events as output events through the AvgTempFromStartStream.

Prerequisites

See Prerequisites in CEP Samples Setup page.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0104. For instructions, see Starting sample CEP configurations. This sample configuration does the following:

  • Points the default Axis2 repo to <CEP_HOME>/sample/artifacts/0104 (by default, the Axis2 repo is <CEP_HOME>/repository/deployment/server).

Executing the sample

  1. Log into the CEP management console which is located at https://localhost:9443/carbon.

     

  2. Go to Tools -> Event Simulator. Under the 'Multiple Events' section, you can see the listed 'events.csv' file which contains some sample data. Click 'play' to start sending sample events from the file.
  3. See the output events received from the CEP console. This sample uses the logger adaptor to log output events to the console. Since this execution plan uses 1 minute time windows, observe the results for a few minutes to get all results from different queries.

    For example, given below is a screenshot of the output of the consumer sending events from the producer:

  • No labels