WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to set up an execution plan with some basic queries that perform projections and transformations of attributes and enriching events with new attributes. This sample uses Event simulator for inputs and the logger publisher for logging the outputs to the CEP console.

The execution plan used in this sample are as follows:

@Import('TempStream:1.0.0')
define stream TempStream (deviceID long, roomNo int, temp double);


@Export('TransformedRoomTempStream:1.0.0')
define stream TransformedRoomTempStream (uuid string, temp double, scale string);


@Export('EnrichedRoomTempStream:1.0.0')
define stream EnrichedRoomTempStream (roomNo int, temp double, scale string);


from TempStream
select roomNo, temp
insert into RoomTempStream;


from RoomTempStream
select roomNo, temp, 'C' as scale
insert into EnrichedRoomTempStream;


from TempStream
select str:concat(roomNo, '-', deviceID) as uuid, (temp * 1.8000 + 32.00) as temp, 'F' as scale
insert into TransformedRoomTempStream;

The first query,

  • Processes the events received through the TempStream
  • Selects the attributes (roomNo, temp) specified under the select clause, from each event received.
  • Emits those events as output events through the RoomTemp stream.

The second query,

  • Processes the events received through the RoomTempStream (which is the output of the previous query)
  • Selects the attributes (roomNo, temp) specified under the select clause, from each event received. It also adds a new attribute named 'scale', with a constant value that it 'C'.
  • Emits those events as output events through the EnrichedRoomTempStream stream.

The third query,

  • Processes the events received through the TempStream.
  • When selecting the attributes, concatenate roomNo and deviceId with the inbuilt concat function and names it as uuid using the 'as' keyword, converts the temp attributes to Farenheit scale (from the incoming Celcius scale), and then adds a new attribute named 'scalw' with the value 'F' under the select clause, from each event received. 
  • Emits those events as output events through the TransformedRoomTempStream.

Prerequisites

Set up the prerequisites required for all samples.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0102. For instructions, see Starting sample CEP configurations. This sample configuration does the following:

  • Points the default Axis2 repo to <CEP_HOME>/samples/cep/artifacts/0102 (by default, the Axis2 repo is <CEP_HOME>/repository/deployment/server).

Executing the sample

  1. Log into the CEP management console which is located at https://localhost:9443/carbon.

  2. Go to Tools -> Event Simulator. Under the 'Multiple Events' section, you can see the listed events.csv file which contains some sample data. Click Play to start sending sample events from the file. Here, you can perform other operations such as Pause, Stop and Resume if required.
  3. See the output events received from the CEP console. This sample uses the logger adaptor to log output events to the console.

    For example, given below is a screenshot of the output of the consumer sending events from the producer:

  • No labels