Introduction
This sample demonstrates how to set up an execution plan with some basic queries that perform projections and transformations of attributes and enriching events with new attributes. This sample uses Event simulator for inputs and the logger publisher for logging the outputs to the CEP console.
The execution plan used in this sample are as follows:
@Import('TempStream:1.0.0') define stream TempStream (deviceID long, roomNo int, temp double); @Export('TransformedRoomTempStream:1.0.0') define stream TransformedRoomTempStream (uuid string, temp double, scale string); @Export('EnrichedRoomTempStream:1.0.0') define stream EnrichedRoomTempStream (roomNo int, temp double, scale string); from TempStream select roomNo, temp insert into RoomTempStream; from RoomTempStream select roomNo, temp, 'C' as scale insert into EnrichedRoomTempStream; from TempStream select str:concat(roomNo, '-', deviceID) as uuid, (temp * 1.8000 + 32.00) as temp, 'F' as scale insert into TransformedRoomTempStream;
The first query,
- Processes the events received through the TempStream
- Selects the attributes (roomNo, temp) specified under the
select
clause, from each event received. - Emits those events as output events through the RoomTemp stream.
The second query,
- Processes the events received through the RoomTempStream (which is the output of the previous query)
- Selects the attributes (roomNo, temp) specified under the
select
clause, from each event received. It also adds a new attribute named 'scale', with a constant value that it 'C'. - Emits those events as output events through the EnrichedRoomTempStream stream.
The third query,
- Processes the events received through the TempStream.
- When selecting the attributes, concatenate roomNo and deviceId with the inbuilt concat function and names it as uuid using the 'as' keyword, converts the temp attributes to Farenheit scale (from the incoming Celcius scale), and then adds a new attribute named 'scalw' with the value 'F' under the
select
clause, from each event received. - Emits those events as output events through the TransformedRoomTempStream.
Prerequisites
Set up the prerequisites required for all samples.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0102. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Points the default Axis2 repo to
<CEP_HOME>/samples/cep/artifacts/0102
(by default, the Axis2 repo is<CEP_HOME>/repository/deployment/server
).
Executing the sample
Log into the CEP management console which is located at https://localhost:9443/carbon.
- Go to Tools -> Event Simulator. Under the 'Multiple Events' section, you can see the listed
events.csv
file which contains some sample data. Click Play to start sending sample events from the file. Here, you can perform other operations such as Pause, Stop and Resume if required. See the output events received from the CEP console. This sample uses the logger adaptor to log output events to the console.
For example, given below is a screenshot of the output of the consumer sending events from the producer: