This documentation is for WSO2 Complex Event Processor 4.1.0. View documentation for the latest release.
||
Skip to end of metadata
Go to start of metadata

This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 CEP using examples.

Introduction to Siddhi Query Language

Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. The following table provides definitions of a few terms in the Siddhi Query Language.

TermDefinition
Event StreamA logical series of events ordered in time.
Event Stream DefinitionThis defines event streams. An event stream has a unique name and a set of attributes assigned specific types, with uniquely identifiable names defining its schema.
EventAn event is associated with only one event stream, and all events of that stream have an identical set of attributes assigned specific types (or the same schema). An event contains a timestamp and the attribute values according to the schema.
AttributeAn attribute has a unique name within the event stream. The attribute type can be string, int, long, float, double, bool or object.
QueryA logical construct that derives new streams by combining existing streams. A query contains one or more input streams, handlers to modify those input streams, and an output stream to which it publishes its output events.
PartitionA logical container that processes a subset of the queries based on a pre-defined rule of separation.
Event TableA structured representation of stored data, allowing stored data to be accessed and manipulated at runtime.

Siddhi has the following language constructs:

  • Event Stream Definitions
  • Event Table Definitions
  • Partitions
  • Queries

The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ; ). 

Event Stream

An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.

A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses. 

Event Stream Definition

The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.

define stream <stream name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

E.g. A stream named TempStream can be created with the following attributes as shown below. 

Attribute NameAttribute Type
deviceIDlong
roomNoint
tempdouble
define stream TempStream (deviceID long, roomNo int, temp double);

Query

Each Siddhi query can consume one or more event streams and create a new event stream from them.

All queries contain an input section and an output section. Some also contain a projection section. A simple query with all three sections is as follows.

from <input stream name> 
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

e.g., If you want to derive only the room temperature from the TempStream event stream defined above, a query can be defined as follows.

from TempStream 
select roomNo, temp
insert into RoomTempStream;

Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e.  RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.  

Query Projection 

SiddhiQL supports the following for query projection.

ActionDescription
Selecting required objects for projection

This involves selecting only some of the attributes in an input stream to be inserted into an output stream.

e.g.,The following query selects only the  roomNo and temp attributes from the TempStream stream.

from TempStream 
select roomNo, temp
insert into RoomTempStream;
Selecting all attributes for projection

This involves selecting all the attributes in an input stream to be inserted into an output stream. This can be done by using the asterisk sign ( * ) or by omitting the select statement.

e.g., Use one of the following queries to select all the attributes in the TempStream stream.

from TempStream 
select *
insert into NewTempStream;

or

from TempStream 
insert into NewTempStream;
Renaming attributes

This involves selecting attributes from the input streams and inserting them into the output stream with different names.

e.g., The following query renames roomNo to roomNumber and temp to temperature

from TempStream 
select roomNo as roomNumber, temp as temperature
insert into RoomTempStream;
Introducing the default value

This involves adding a default value and assigning it to an attribute using as.

e.g.,

from TempStream 
select roomNo, temp, 'C' as scale
insert into RoomTempStream;
Using mathematical and logical expressions

This involves using attributes with mathematical and logical expressions to the precedence order given below, and assigning them to the output attribute using as.

Operator precedence

OperatorDistributionExample
 () Scope
(cost + tax) * 0.05
 IS NULL Null check
deviceID is null
 NOT Logical NOT
not (price > 10)
 *   /   %  Multiplication, division, modulo
temp * 9/5 + 32
 +   -  Addition, substraction
temp * 9/5 + 32
 <   <=   >   >=Comparisons: less-than, greater-than-equal, greater-than, less-than-equal
totalCost >= price * quantity 
 ==   !=  Comparisons: equal, not equal
totalCost >= price * quantity 
INContains in table
roomNo in ServerRoomsTable
ANDLogical AND
temp < 40 and (humidity < 40 or humidity >= 60)
ORLogical OR
temp < 40 and (humidity < 40 or humidity >= 60)

e.g., Converting Celsius to Fahrenheit and identifying server rooms

from TempStream 
select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom
insert into RoomTempStream;


 

Functions

A function consumes zero, one or more function parameters and produces a result value.

Function parameters 

Functions parameters can be attributes ( int, long, float, double, string, bool, object), results of other functions, results of mathematical or logical expressions or time parameters. 

Time is a special parameter that can we defined using the time value as int and its unit type as <int> <unit>. Following are the supported unit types, Time upon execution will return its expression in the scale of milliseconds as a long value. 

UnitSyntax
Yearyear | years
Monthmonth | months
Weekweek | weeks
Dayday | days
Hourhour | hours
Minutesminute | minutes | min
Secondssecond | seconds | sec
Millisecondsmillisecond | milliseconds

E.g. Passing 1 hour and 25 minutes to test function. 

test(1 hour 25 min)

Functions, mathematical expressions, and logical expressions can be used in a nested manner. 

Inbuilt Functions

Siddhi supports the following inbuilt functions.

e.g., The following configuration converts the room number to string and adds a message ID to each event using convert and UUID functions.

from TempStream 
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
insert into RoomTempStream;

Filter

Filters can be used with input streams to filter events based on the given filter condition. Filter conditions should be defined in square brackets next to the input stream name as shown below.

from <input stream name>[<filter condition>]
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

 e.g., The following configuration filters all server rooms having a temperature greater than 40 degrees.

from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ]
select roomNo, temp
insert into HighTempStream;

Window

Windows allow you to capture a subset of events based on criterion from input event stream for calculation. They can be defined next to input streams using the '#window.' prefix. Each input stream can only have maximum of one window as follows.

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event whenever an event in a window expires based on that window criteria.

Inbuilt Windows

Siddhi supports the following inbuilt windows.

Output Event Categories

Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output. 

  • current events : Will emit all the events that arrives to the window. This is the default functionality is no event category is specified.
  • expired events : Will emit all the events that expires from the window. 
  • all events :  Will emit all the events that arrives and expires from the window. 

For using with insert into statement use the above keywords between 'insert' and 'into' as given in the example below. 

 E.g. Delay all events in a stream by 1 minute. 

from TempStream#window.time(1 min)
select *
insert expired events into DelayedTempStream

Aggregate Functions

Aggregate functions can be used with windows to perform aggregate calculations within the defined window. 

Inbuilt Aggregate Functions

Siddhi supports the following inbuilt aggregate functions.

E.g. Notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes. 

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream;

Group By

Group by allows us to group the aggregation based on group by attributes. 

E.g. Find the average temperature per room and device ID for the last 10 min.

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
group by roomNo, deviceID
insert into AvgTempStream;

Having

Having allows us to filter events after aggregation and after processing at the selector. 

E.g. Find the average temperature per room for the last 10 min and alert if it's more than 30 degrees.

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo
group by roomNo
having avgTemp > 30
insert into AlertStream;

Output Rate Limiting

Output rate limiting allows queries to emit events periodically based on the condition specified.

 Rate limiting follows the below syntax.

from <input stream name>...
select <attribute name>, <attribute name>, ...
output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)
insert into <output stream name> 

With "<output-type>" the number of events that need to be emitted can be specified, "first", "last" and "all" are possible key wards that can be specified to emit only the first event, last event, or all events from the arrived events. If the key word is omitted it will default to "all" emitting all events.

With  "<time interval>" the time interval for the periodic event emotion can be specified. 

With  "<event interval>" the number of event need to be arrived for the periodic event emotion can be specified. 

Based on number of events

Here the events will be emitted every time when the predefined number of events have arrived, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

e.g., Emit the last temperature event per sensor every 10 events    

from TempStream
select temp
group by deviceID
output last every 10 events
insert into LowRateTempStream;
Based on time

Here the events will be emitted for every predefined time interval, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

E.g. Emit the all temperature events every 10 seconds     

from TempStream
output every 10 sec
insert into LowRateTempStream;
Periodic snapshot

This works best with windows, when the input stream as a window attached snapshot rate limiting will emit all current events arrived so far which does not have corresponding expired events for every predefined time interval, at the same time when no window is attached to the input stream it will only emit the last current event for every predefined time interval. 

E.g. Emit snapshot of the events in time window of 5 seconds every one second.     

from TempStream#window.time(5 sec)
output snapshot every 1 sec
insert into SnapshotTempStream;

Joins

Join allows two event streams to be merged based on a condition. Here each stream should be associated with a window (if there are no window assigned #window.length(0) with be assigned to the input event stream). During the joining process each incoming event on each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated. 

The syntax of join looks like below. 

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} 
		join <input stream name>#window.<window name>(<parameter>,  ... ) {unidirectional} {as <reference>}
	on <join condition>
	within <time gap>
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

With "on <join condition>” Siddhi joins only the events that matches the condition.

With "unidirectional" keyword the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when unidirectional keyword is used on an input stream only the events arriving on that stream will trigger the joining process. Note we cannot use unidirectional keyword for both the input streams (as thats equal to the default behaviour, which is not using the unidirectional keyword at all).

With "within <time gap>" the joining process matched the events that are within defined time gap of each other. 

When projecting the join events the attributes of each stream need to be referred with the stream name  (E.g. <stream name>.<attribute name>) or with its reference Id (specially when events of same streams are joined) (E.g. <stream reference Id>.<attribute name>), "select *" can be used or "select" statement itself can be omitted if all attributes of the joined events need to be projected, but these can only be used when both streams does not have any attributes with same names. 

e.g., Assume that the temperature regulators are updated every minute. The following switches on the temperature regulators if they are not already on for all the rooms that have a room temperature greater than 30 degrees.  

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, isOn bool);

from TempStream[temp > 30.0]#window.time(1 min) as T 
  join RegulatorStream[isOn == false]#window.length(1) as R
  on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

Outer joins

The syntax of an outer join is as follows.

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} 
     (left|right|full) outer join <input stream name>#window.<window name>(<parameter>,  ... ) {unidirectional} {as <reference>}
    on <join condition>
    within <time gap>
select <attribute name>, <attribute name>, ...
insert into <output stream name>
Left outer join

Outer join allows two event streams to be merged based on a condition. However, it returns all the events of left stream even if there are no matching events in the right stream. Here each stream should be associated with a window. During the joining process, each incoming event of each stream is matched against all the events in the other input event stream window based on the given condition. Incoming events of the right stream are matched against all events in the left event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the left stream even if there are no matching events in right stream.

e.g., The following generates output events for all the events in the stockStream stream whether there is a match for the symbol in the twitterStram stream or not.

from stockStream#window.length(2) 
left outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ;
Right outer join

This is similar to left outer join and, it returns all the events of right stream even if there are no matching events in the left stream. Incoming events of the left stream are matched against all events in the right event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the right stream even if there are no matching events in left stream.

e.g., The following generates output events for all the events in the twitterStream stream whether there is a match for the symbol in the stockStream stream or not.

from stockStream#window.length(2) 
right outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ;
Full outer join

The full outer join combines the results of left outer join and right outer join. An output event is generated for each incoming event even if there are no matching events in the other stream.

e.g., The following generates output events for all the incoming events of each stream whether there is a match for the symbol in the other stream or not.

from stockStream#window.length(2) 
full outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ;

Pattern

Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival. With pattern there can be other events in between the events that match the pattern condition. It will internally create state machines to track the states of the matching process. Pattern can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of pattern looks like below. 

from {every} <input event reference>=<input stream name>[<filter condition>] -> {every} <input event reference>=<input stream name>[<filter condition>] -> ...	
	within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name>  

Input Streams cannot be associated with a window.

With "->" we can correlate incoming events arrivals, having zero or many other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the pattern will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword appropriately to trigger a pattern matching process upon event arrival.  

E.g. Alert if temperature of a room increases by 5 degrees within 10 min.  

from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ]
	within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical Pattern 

Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "->" to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Alert when the room temperature reaches the temperature set on the regulator, (the pattern matching should be reseted whenever the temperature set on the regulator changes).  

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, tempSet double);
 
from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2.temp as roomTemp
having e3 is null
insert into AlertStream;
Counting Pattern 

Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.

With <1:4> matches 1 to 4 events

With <2:> matches 2 or more events and with <:5> up to 5 events.

With <5> matches exactly 5 events.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Get the temperature difference between two regulator events.

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
 
from every( e1=RegulatorStream) -> e2=TempStream[e1.roomNo==roomNo]<1:> -> e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
insert into TempDiffStream;

Sequence

Sequence allows event streams to be correlated over time and detect event sequences based on the order of event arrival. With sequence there can not be other events in between the events that match the sequence condition. It will internally create state machines to track the states of the matching process. Sequence can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of sequence looks like below. 

from {every} <input event reference>=<input stream name>[<filter condition>], <input event reference>=<input stream name>[<filter condition>]{+|*|?}, ...	
	within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name>  

Input Streams cannot be associated with a window.

With "," we can correlate immediate next incoming events arrivals, having no other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the sequence will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword in the beginning to trigger a sequence matching process upon every event arrival.  

E.g. Alert if there is more than 1 degree increase in temperature between two consecutive temperature events.  

from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical Sequence 

Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "," to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Notify when a regulator event is followed by both the temperature and humidity events.  

define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);
 
from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream;
Counting Sequence 

Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.

With "*" zero or more events can be matched.

With "+" one or more events can be matched.

With "?" zero or one events can be matched.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Identify peak temperatures.

define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
 
from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
select e1.temp as initialTemp, e2[last].temp as peakTemp
insert into TempDiffStream;

Partition

With partition Siddhi can divide both incoming events & queries and process them parallel in isolation. Each partition will be tagged with a partition key and only the events corresponding to the given partition key will be processed at that partition. Each partition will have separate instances of Siddhi queries providing isolation of processing states. Partition can contain more than one query.

Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.

Variable Partition 

Partition using categorical (string) attributes will adhere to the following syntax.

partition with ( <attribute name> of <stream name>, <attribute name> of <stream name>, ... )
begin
	<query>
	<query>
	...
end;

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events each sensor has emitted.

partition with ( deviceID of TempStream )
begin
	from TempStream#window.length(10)
	select roomNo, deviceID, max(temp) as maxTemp
	insert into DeviceTempStream
end;

Range Partition 

Partition using numerical attributes will adhere to the following syntax.

partition with ( <condition> as <partition key> or <condition> as <partition key> or ... of <stream name>, ... )
begin
	<query>
	<query>
	...
end;

E.g. Per office area calculate the average temperature over last 10 minutes.

partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) )
begin
	from TempStream#window.time(10 min)
	select roomNo, deviceID, avg(temp) as avgTemp 
	insert into AreaTempStream
end;

Inner Streams 

Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block. 

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events when the sensor is having an average temperature greater than 20 over the last minute.

partition with ( deviceID of TempStream )
begin
	from TempStream#window.time(1 min)
	select roomNo, deviceID, temp, avg(temp) as avgTemp
	insert into #AvgTempStream
 
	from #AvgTempStream[avgTemp > 20]#window.length(10)
	select roomNo, deviceID, max(temp) as maxTemp
	insert into deviceTempStream
end;

Event Table

Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDMS data stores.

Event Table Definition

Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table. 

define table <table name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

With the above definition, events will be stored in-memory via a linked list data structure.

E.g. Room type table with name RoomTypeTable can be created as below with attributes room number as int and type as string. 

define table RoomTypeTable (roomNo int, type string);
Indexing Event Table

Event table can be indexed for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.

E.g. An indexed room type table with attribute room number can be created as bellow with name RoomTypeTable and attributes room number as int & type as string. 

@IndexBy('roomNo')
define table RoomTypeTable (roomNo int, type string);
Hazelcast Event Table

Event tables also support persisting event data in a distributed manner using Hazelcast in-memory data grids. This functionality can easily be enabled using the "From" annotation. Also, the connection instructions for Hazelcast cluster can be assigned to the event table using "From" annotation.

Connection instructions that can be used with "From" annotation: 

cluster.name : Hazelcast cluster name [Optional]  (i.e cluster.name='cluster_a').

cluster.password Hazelcast cluster password [Optional] (i.e cluster.password='pass@cluster_a').

cluster.addresses Hazelcast cluster addresses as a comma separated string [Optional] (i.e cluster.addresses='192.168.1.1:5700,192.168.1.2.5700').

E.g. 1. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by a new Hazelcast Instance.

 

@from(eventtable = 'hazelcast')
define table RoomTypeTable(roomNo int, type string);
E.g. 2. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by a new Hazelcast Instance in a new Hazelcast Cluster.
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a')
define table RoomTypeTable(roomNo int, type string);
E.g. 3. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by a existing Hazelcast Instance in an existing Hazelcast Cluster.
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a', cluster.addresses='192.168.1.1:5700,192.168.1.2.5700')
define table RoomTypeTable(roomNo int, type string);

 

RDBMS Event Table

Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source or the connection instructions can be assigned to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.

RDBMS event table has been tested with the following databases: 

    • MySQL

    • H2

    • Oracle

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table named RoomTable from the data source named AnalyticsDataSource. 

@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string);

Note

The datasource.name given here is injected to the Siddhi engine by the CEP server. To configure data sources in the CEP, see  Datasources in the Admin Guide.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by MySQL table named RoomTable from the database cepdb located at localhost:3306 having user name "root" and password "root". 

@From(eventtable='rdbms', jdbc.url='jdbc:mysql://localhost:3306/cepdb', username='root', password='root', driver.name='com.mysql.jdbc.Driver', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string);

Caching events with RDBMS Event Table

Several caches can be used with RDMBS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation. 

The supported cache implementations are as follows;

  1. Basic: Events are cached in a FIFO manner where the oldest event will be dropped when the cache is full. 
  2. LRU (Least Recently Used): The least recently used event is dropped when the cache is full.
  3. LFU (Least Frequently Used): The least frequently used event is dropped when the cache is full.

In the "From" annotation, if the "cache" element is not specified the "Basic" cache will be assigned by default, and if the "cache.size" element is not assigned the default value 4096 will be assigned as the cache size.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table using least recently used caching algorithm for caching 3000 events. 

@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000')
define table RoomTypeTable (roomNo int, type string);

Using Bloom Filters

A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable checks.

The following example shows how to include Bloom Filters in an event table update query.

define stream StockStream (symbol string, price float, volume long);
define stream CheckStockStream (symbol string, volume long);
@from(eventtable = 'rdbms' ,datasource.name = 'cepDB' , table.name = 'stockInfo' , bloom.filters = 'enable')
define table StockTable (symbol string, price float, volume long);
 
@info(name = 'query1')
from StockStream
insert into StockTable ;
 
@info(name = 'query2')
from CheckStockStream[(StockTable.symbol==symbol) in StockTable]
insert into OutStream;

For more information about In-memory event tables, see Sample 0106 - Using in-memory event tables.

For more information about RDBMS event tables, see Sample 0107 - Using RDBMS event tables.

Insert into

Query for inserting events into table is similar to the query of inserting events into event streams, where we will be using "insert into <table name>" code snippet. To insert only the specified output event category use "current events", "expired events" or "all events" keywords between 'insert' and 'into' keywords. 

E.g. Insert all temperature events from TempStream to temperature table 

from TempStream
select *
insert into TempTable;

Delete

Query for deleting events on event table can be written using a delete query having following syntax

from <input stream name> 
select <attribute name>, <attribute name>, ...
delete <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for deletion, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

E.g. Delete the entries of the RoomTypeTable associated to the room numbers of DeleteStream.

define table RoomTypeTable (roomNo int, type string);
define stream DeleteStream (roomNumber int);
 
from DeleteStream 
delete RoomTypeTable
	on RoomTypeTable.roomNo == roomNumber;

To execute delete only for the specified output event category instead of  "delete <table name> on <condition>" code snippet use "delete <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

Update

Query for updating events on event table can be written using an update query having following syntax

from <input stream name> 
select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ...
update <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for update, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated on event table.

E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream.

define table RoomTypeTable (roomNo int, type string);
define stream UpdateStream (roomNumber int, roomType string);
 
from UpdateStream 
select roomType as type
update RoomTypeTable
	on RoomTypeTable.roomNo == roomNumber;

To execute update only for the specified output event category instead of  "update <table name> on <condition>" code snippet use "update <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

Insert Overwrite

Query for insert or overwrite events on event table can be written using an insert-overwrite query having following syntax

from <input stream name> 
select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ...
insert overwrite <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for update or insert, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated/inserted on event table.

E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream or insert if it is not exist.

define table RoomTypeTable (roomNo int, type string);
define stream UpdateStream (roomNumber int, roomType string);
 
from UpdateStream 
select roomNumber as roomNo, roomType as type
insert overwrite RoomTypeTable
	on RoomTypeTable.roomNo == roomNo;

In

Query for checking whether an attribute is in event table can be checked using conditions having the following syntax

 <condition> in <table name> 

Here the "<condition>" can be used to select the matching attribute, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the incoming stream should not be have reference associated with them. 

E.g. By checking ServerRoomTable output only the temperature events associated with the saver rooms.

define table ServerRoomTable (roomNo int);
define stream TempStream (deviceID long, roomNo int, temp double);
 
from TempStream[ServerRoomTable.roomNo == roomNo in ServerRoomTable] 
insert into ServerTempStream;

Join

A stream can be joined with event table and retrieve data from the event table. In oder to join a stream with an event table a simple join query could be used, and at join the event table should not be associated with window operations as event table is not an active construct. Because of  the same reason event table cannot be joined with another event table in Siddhi. 

E.g. Update the events in temperature stream with their room type based on the RoomTypeTable.

define table RoomTypeTable (roomNo int, type string);
define stream TempStream (deviceID long, roomNo int, temp double);
 
from TempStream join RoomTypeTable
	on RoomTypeTable.roomNo == TempStream.roomNo
select deviceID, RoomTypeTable.roomNo as roomNo, type, temp 
insert into EnhancedTempStream;

Event Trigger

Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long. 

Event Trigger Definition

Event Trigger Definition defines the event triggering interval following the below syntax.

define trigger <trigger name> at {'start'| every <time interval>| '<cron expression>'};

With "'start'" an event will be trigger at Siddhi start.

With "every <time interval>" an event will be triggered periodically on the given time interval.

With "'<cron expression>'" an event will be triggered periodically based on the given cron expression, refer quartz-scheduler for config details.

E.g  Trigger an event every 5 minutes.

define trigger FiveMinTriggerStream at every 5 min;

E.g  Trigger an event at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday.

define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI';

Eval Script

Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.

Eval Script Definition

Eval Script Definition defines the function operation following the below syntax.

define function <function name>[<language name>] return <return type> {
	<operation of the function>
};

With "<function name>" a function will be defined to be using in the queries. Note this function will overwrite the Inbuilt Functions.

With "<language name>" the execution language will be defined e.g. JavaScript, R, Scala.

With "<return type>" the return type of the function is defined, it can either be int, long, float, double, string, bool or object. Here the function implementer should be responsible for returning the output on the defined <return type> for proper functionality. 

With "<operation of the function>" the execution logic of the function should be written on the defined language defined in <language name> returning <return type>. 

Supported Eval Script Languages  

  • JavaScript
  • R
  • Scala 

JavaScript

E.g  Concatenating function in JavaScript.

define function concatFn[JavaScript] return string {
	var str1 = data[0];
	var str2 = data[1];
	var str3 = data[2];
	var responce = str1 + str2 + str3;
	return responce;
};

define stream TempStream(deviceID long, roomNo int, temp double);

from TempStream 
select concatFn(roomNo,'-',deviceID) as id, temp   
insert into DeviceTempStream;

R

E.g  Concatenating function in R.

define function concatFn[R] return string {
	return(paste(data, collapse=""));
};

define stream TempStream(deviceID long, roomNo int, temp double);

from TempStream 
select concatFn(roomNo,'-',deviceID) as id, temp  
insert into DeviceTempStream;

Scala

E.g  Concatenating function in Scala.

define function concatFn[Scala] return string {
    var concatenatedString =
     for(i <- 0 until data.length){
         concatenatedString += data(i).toString
     }
     concatenatedString
};

define stream TempStream(deviceID long, roomNo int, temp double);

from TempStream 
select concatFn(roomNo,'-',deviceID) as id, temp  
insert into DeviceTempStream;

Siddhi Extensions

Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;

<namespace>:<function name>(<parameter1>, <parameter2>, ... )

Here the namespace will allow Siddhi to identify the function as an extension and its extension group, the function name will denote the extension function within the given group, and the parameters will be the inputs that can be passed to the extension for evaluation and/or configuration.  

E.g. A window extension created with namespace foo and function name unique can be referred as follows:

from StockExchangeStream[price >= 20]#window.foo:unique(symbol)
select symbol, price
insert into StockQuote 

Extension Types 

Siddhi supports following five type of extensions: 

Function Extension 

For each event it consumes zero or more parameters and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. Implemented by extending "org.wso2.siddhi.core.executor.function.FunctionExecutor".

E.g. "math:sin(x)" here the sin function of math extension will return the sin value its parameter x.

Aggregate Function Extension

For each event it consumes zero or more parameters and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator. Implemented by extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator".

E.g. "custom:std(x)" here the std aggregate function of custom extension will return the standard deviation of value x based on the assigned window to its query. 

Window Extension

Allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor".

E.g. "custom:unique(key)" here the unique window of custom extension will return all events as current events upon arrival as current events and when events arrive with the same value based on the "key" parameter the corresponding to a previous event arrived the previously arrived event will be emitted as expired event.

Stream Function Extension

Allows events to be altered by adding one or more attributes to it. Here events could be outputted upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor".

E.g. "custom:pol2cart(theta,rho)" here the pol2cart function of custom extension will return all events by calculating the cartesian coordinates x & y and adding them as new attributes to the existing events.

Stream Processor Extension

Allows events to be collected and expired with altering the event format based on the given input parameters. Implemented by extending "oorg.wso2.siddhi.core.query.processor.stream.StreamProcessor".

E.g. "custom:perMinResults(arg1, arg2, ...)" here the perMinResults function of custom extension will return all events by adding one or more attributes the events based on the conversion logic and emitted as current events upon arrival as current events and when at expiration expired events could be emitted appropriate expiring events attribute values for matching the current events attributes counts and types.

Available Extensions 

Siddhi currently have several prewritten extensions as follows; 

Extensions released under Apache License v2 : 

  • math : Supporting mathematical operations 
  • str : Supporting String operations 
  • geo : Supporting geocode operations
  • regex : Supporting regular expression operations
  • time : Supporting time expression operations
  • ml : Supporting Machine Learning expression operations
  • timeseries : Supporting Time Series operations
  • kf (Kalman Filter) : Supporting filtering capabilities by detecting outliers of the data.
  • map : Supporting to send a map object inside Siddhi stream definitions and use it inside queries.
  • reorder : Supporting for reordering events from an unordered event stream using Kslack algorithm.

Extensions released under GNU/GPL License v3 : 

  • geo : Supporting geographical processing operations   
  • r :  Supporting R executions
  • nlp : Supporting Natural Language Processing expression operations
  • pmml : Supporting Predictive Model Markup Language expression operations

You can get them from https://github.com/wso2-gpl/siddhi

Writing Custom Extensions

Custom extensions can be written in order to cater usecase specific logics that are not out of the box available in Siddhi or as an extension. 

To create custom extensions two things need to be done. 

  1. Implementing the extension logic by extending well defined Siddhi interfaces. E.g implementing a UniqueWindowProcessor by extending org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.

    package org.wso2.test;
     
    public class UniqueWindowProcessor extends WindowProcessor {
       ...
    }
  2. Add an extension mapping file to map the written extension class with the extension function name and namespace. Here extension mapping file should be named as "<namespace>.siddhiext". E.g Mapping the written UniqueWindowProcessor extension with function name "unique" and namespace "foo", to do so the mapping file should be named as foo.siddhiext and the context of the file should as below; 

    # function name to class mapping of 'foo' extension
    unique=org.wso2.test.UniqueWindowProcessor

Refer following for implementing different types of Siddhi extensions with examples

  • No labels