The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

The guide provides specification of Siddhi Query Language 3.0 with examples

This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 DAS using examples.

Introduction to Siddhi Query Language

Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. The following table provides definitions of a few terms in the Siddhi Query Language.

Siddhi has the following language constructs;

  • Event Stream Definitions
  • Event Table Definitions
  • Partitions
  • Queries

The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ; ). 

Event Stream

A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses. 

Event Stream Definition

The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.

define stream <stream name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

E.g. A stream named TempStream can be created with the following attributes as shown below. 

Attribute NameAttribute Type
deviceIDlong
roomNoint
tempdouble
define stream TempStream (deviceID long, roomNo int, temp double);

Query

Each Siddhi query can consume one or more event streams and create a new event stream from them.

All queries contain an input section and an output section. Some also contain a projection section. A simple query with all three sections is as follows.

from <input stream name> 
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

e.g., If you want to derive only the room temperature from the TempStream event stream defined above, a query can be defined as follows.

from TempStream 
select roomNo, temp
insert into RoomTempStream;

Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e.  RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.  

Query Projection 

SiddhiQL supports the following for query projection.

ActionDescription
Selecting required objects for projection

This involves selecting only some of the attributes in an input stream to be inserted into an output stream.

e.g., The following query selects only the  roomNo and temp attributes from the TempStream stream.

from TempStream 
select roomNo, temp
insert into RoomTempStream;
Selecting all attributes for projection

This involves selecting all the attributes in an input stream to be inserted into an output stream. This can be done by using the asterisk sign ( * ) or by omitting the select statement.

e.g., Use one of the following queries to select all the attributes in the TempStream stream.

from TempStream 
select *
insert into NewTempStream;

or

from TempStream 
insert into NewTempStream;
Renaming attributes

This involves selecting attributes from the input streams and inserting them into the output stream with different names.

e.g., The following query renames roomNo to roomNumber and temp to temperature

from TempStream 
select roomNo as roomNumber, temp as temperature
insert into RoomTempStream;
Introducing the default value

This involves adding a default value and assigning it to an attribute using as.

e.g.,

from TempStream 
select roomNo, temp, 'C' as scale
insert into RoomTempStream;
Using mathematical and logical expressions

This involves using attributes with mathematical and logical expressions to the precedence order given below, and assigning them to the output attribute using as .

Operator precedence

OperatorDistributionExample
 () Scope
(cost + tax) * 0.05
 IS NULL Null check
deviceID is null
 NOT Logical NOT
not (price > 10)
 *   /   %   Multiplication, division, modulo
temp * 9/5 + 32
 +   -   Addition, substraction
temp * 9/5 + 32
 <   <=   >   >=Comparisons: less-than, greater-than-equal, greater-than, less-than-equal
totalCost >= price * quantity 
 ==   !=   Comparisons: equal, not equal
totalCost >= price * quantity 
INContains in table
roomNo in ServerRoomsTable
ANDLogical AND
temp < 40 and (humidity < 40 or humidity >= 60)
ORLogical OR
temp < 40 and (humidity < 40 or humidity >= 60)

e.g., Converting Celsius to Fahrenheit and identifying server rooms

from TempStream 
select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom
insert into RoomTempStream;


 

Functions

A function consumes zero, one or more function parameters and produces a result value.

Function parameters 

Functions parameters can be attributes (  int , long, float, double, string, bool, object ), results of other functions, results of mathematical or logical expressions or time parameters. 

Time is a special parameter that can we defined using the time value as int and its unit type as <int> <unit>. Following are the supported unit types, Time upon execution will return its expression in the scale of milliseconds as a long value. 

UnitSyntax
Yearyear | years
Monthmonth | months
Weekweek | weeks
Dayday | days
Hourhour | hours
Minutesminute | minutes | min
Secondssecond | seconds | sec
Millisecondsmillisecond | milliseconds

E.g. Passing 1 hour and 25 minutes to test function. 

test(1 hour 25 min)

Functions, mathematical expressions, and logical expressions can be used in a nested manner. 

Inbuilt Functions

Siddhi supports the following inbuilt functions.

E.g. With convert and UUID function, converting room number to string and introducing message ID to each event.

from TempStream 
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
insert into RoomTempStream;

Filter

Filters can be used with input streams to filter events based on the given filter condition. Filter condition should be defined in square brackets next to the input stream name.

from <input stream name>[<filter condition>]
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

E.g. Filtering all server rooms having temperature greater then 40 degrees.

from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ]
select roomNo, temp
insert into HighTempStream;

Window

Windows allows to capture a subset of events based on a criteria from input event stream for calculation, they can be defined next to input streams using '#window.' prefix and each input stream can only have maximum of one window as follows.

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event whenever an event in a window expires based on that window criteria.

Inbuilt Windows

Siddhi supports the following inbuilt windows.

Output Event Categories

Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output. 

  • current events : Will emit all the events that arrives to the window. This is the default functionality is no event category is specified.
  • expired events : Will emit all the events that expires from the window. 
  • all events :  Will emit all the events that arrives and expires from the window. 

For using with insert into statement use the above keywords between 'insert' and 'into' as given in the example below. 

 E.g. Delay all events in a stream by 1 minute. 

from TempStream#window.time(1 min)
select *
insert expired events into DelayedTempStream

Aggregate Functions

Aggregate functions can be used with windows to perform aggregate calculations within the defined window. 

Inbuilt Aggregate Functions

Siddhi supports the following inbuilt aggregate functions.

E.g. Notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes. 

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream;

Group By

Group by allows us to group the aggregation based on group by attributes. 

E.g. Find the average temperature per room and device ID for the last 10 min.

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
group by roomNo, deviceID
insert into AvgTempStream;

Having

Having allows us to filter events after aggregation and after processing at the selector. 

E.g. Find the average temperature per room for the last 10 min and alert if it's more than 30 degrees.

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo
group by roomNo
having avgTemp > 30
insert into AlertStream;

Output Rate Limiting

Output rate limiting allows queries to emit events periodically based on the condition specified.

 Rate limiting follows the below syntax.

from <input stream name>...
select <attribute name>, <attribute name>, ...
output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)
insert into <output stream name> 

With "<output-type>" the number of events that need to be emitted can be specified, "first", "last" and "all" are possible key wards that can be specified to emit only the first event, last event, or all events from the arrived events. If the key word is omitted it will default to "all" emitting all events.

With  "<time interval>" the time interval for the periodic event emotion can be specified. 

With  "<event interval>" the number of event need to be arrived for the periodic event emotion can be specified. 

Based on number of events

Here the events will be emitted every time when the predefined number of events have arrived, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

E.g. Emit last temperature event per sensor every 10 events    

from TempStream
group by deviceID
output last every 10 events
insert into LowRateTempStream;
Based on time

Here the events will be emitted for every predefined time interval, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

E.g. Emit the all temperature events every 10 seconds     

from TempStream
output every 10 sec
insert into LowRateTempStream;
Periodic snapshot

This works best with windows, when the input stream as a window attached snapshot rate limiting will emit all current events arrived so far which does not have corresponding expired events for every predefined time interval, at the same time when no window is attached to the input stream it will only emit the last current event for every predefined time interval. 

E.g. Emit snapshot of the events in time window of 5 seconds every one second.     

from TempStream#window.time(5 sec)
output snapshot every 1 sec
insert into SnapshotTempStream;

Joins

Join allows two event streams to be merged based on a condition. Here each stream should be associated with a window (if there are no window assigned #window.length(0) with be assigned to the input event stream). During the joining process each incoming event on each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated. 

The syntax of join looks like below. 

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} 
		join <input stream name>#window.<window name>(<parameter>,  ... ) {unidirectional} {as <reference>}
	on <join condition>
	within <time gap>
select <attribute name>, <attribute name>, ...
insert into <output stream name> 

With "on <join condition>” Siddhi joins only the events that matches the condition.

With "unidirectional" keyword the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when unidirectional keyword is used on an input stream only the events arriving on that stream will trigger the joining process. Note we cannot use unidirectional keyword for both the input streams (as thats equal to the default behaviour, which is not using the unidirectional keyword at all).

With "within <time gap>" the joining process matched the events that are within defined time gap of each other. 

When projecting the join events the attributes of each stream need to be referred with the stream name  (E.g. <stream name>.<attribute name>) or with its reference Id (specially when events of same streams are joined) (E.g. <stream reference Id>.<attribute name>), "select *" can be used or "select" statement itself can be omitted if all attributes of the joined events need to be projected, but these can only be used when both streams does not have any attributes with same names. 

E.g. Switch on temperature regulator if they are not already on, on all room which have current temperature greater than 30 degrees.  

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, isOn bool);

from TempStream[temp > 30.0]#window.time(1 min) as T 
  join RegulatorStream[isOn == false]#window.length(1) as R
  on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

Pattern

Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival. With pattern there can be other events in between the events that match the pattern condition. It will internally create state machines to track the states of the matching process. Pattern can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of pattern looks like below. 

from {every} <input event reference>=<input stream name>[<filter condition>] -> {every} <input event reference>=<input stream name>[<filter condition>] -> ...	
	within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name>  

Input Streams cannot be associated with a window.

With "->" we can correlate incoming events arrivals, having zero or many other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the pattern will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword appropriately to trigger a pattern matching process upon event arrival.  

E.g. Alert if temperature of a room increases by 5 degrees within 10 min.  

from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ]
	within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical Pattern 

Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "->" to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Alert when the room temperature reaches the temperature set on the regulator, (the pattern matching should be reseted whenever the temperature set on the regulator changes).  

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, tempSet double);
 
from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2.temp as roomTemp
having e3 is null
insert into AlertStream;
Counting Pattern 

Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.

With <1:4> matches 1 to 4 events

With <2:> matches 2 or more events and with <:5> up to 5 events.

With <5> matches exactly 5 events.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Get the temperature difference between two regulator events.

define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
 
from every( e1=RegulatorStream) -> e2=TempStream[e1.roomNo==roomNo]<1:> -> e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
insert into TempDiffStream;

Sequence

Sequence allows event streams to be correlated over time and detect event sequences based on the order of event arrival. With sequence there can not be other events in between the events that match the sequence condition. It will internally create state machines to track the states of the matching process. Sequence can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of sequence looks like below. 

from {every} <input event reference>=<input stream name>[<filter condition>], <input event reference>=<input stream name>[<filter condition>]{+|*|?}, ...	
	within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name>  

Input Streams cannot be associated with a window.

With "," we can correlate immediate next incoming events arrivals, having no other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the sequence will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword in the beginning to trigger a sequence matching process upon every event arrival.  

E.g. Alert if there is more than 1 degree increase in temperature between two consecutive temperature events.  

from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical Sequence 

Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "," to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Notify when a regulator event is followed by both the temperature and humidity events.  

define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);
 
from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream;
Counting Sequence 

Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.

With "*" zero or more events can be matched.

With "+" one or more events can be matched.

With "?" zero or one events can be matched.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Identify peak temperatures.

define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
 
from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
select e1.temp as initialTemp, e2[last].temp as peakTemp
insert into TempDiffStream;

Partition

With partition Siddhi can divide both incoming events & queries and process them parallel in isolation. Each partition will be tagged with a partition key and only the events corresponding to the given partition key will be processed at that partition. Each partition will have separate instances of Siddhi queries providing isolation of processing states. Partition can contain more than one query.

Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.

Variable Partition 

Partition using categorical (string) attributes will adhere to the following syntax.

partition with ( <attribute name> of <stream name>, <attribute name> of <stream name>, ... )
begin
	<query>
	<query>
	...
end;

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events each sensor has emitted.

partition with ( deviceID of TempStream )
begin
	from TempStream#window.length(10)
	select roomNo, deviceID, max(temp) as maxTemp
	insert into DeviceTempStream
end;

Range Partition 

Partition using numerical attributes will adhere to the following syntax.

partition with ( <condition> as <partition key> or <condition> as <partition key> or ... of <stream name>, ... )
begin
	<query>
	<query>
	...
end;

E.g. Per office area calculate the average temperature over last 10 minutes.

partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) )
begin
	from TempStream#window.time(10 min)
	select roomNo, deviceID, avg(temp) as avgTemp 
	insert into AreaTempStream
end;

Inner Streams 

Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block. 

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events when the sensor is having an average temperature greater than 20 over the last minute.

partition with ( deviceID of TempStream )
begin
	from TempStream#window.time(1 min)
	select roomNo, deviceID, temp, avg(temp) as avgTemp
	insert into #AvgTempStream
 
	from #AvgTempStream[avgTemp > 20]#window.length(10)
	select roomNo, deviceID, max(temp) as maxTemp
	insert into deviceTempStream
end;

Event table

Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDMS data stores.

Event table definition

Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table. 

define table <table name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

With the above definition events will store all events in-memory via a linked list data structure.

E.g. Room type table with name RoomTypeTable can be created as below with attributes room number as int and type as string. 

define table RoomTypeTable (roomNo int, type string);
Indexing Event Table

Event table can be index for fast event access using the "IndexedBy" annotation. With "IndexedBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.

E.g. An indexed room type table with attribute room number can be created as bellow with name RoomTypeTable and attributes room number as int & type as string. 

@IndexedBy('roomNo')
define table RoomTypeTable (roomNo int, type string);
RDBMS Event Table

Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source and the connection instructions can be assign to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.

RDBMS event table has been tested with the following databases: 

    • MySQL

    • H2

    • Oracle

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table named RoomTable from the data source named AnalyticsDataSource. 

@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string);

Note

The datasource.name given here is injected to the Siddhi engine by the DAS server. To configure data sources in a WSO2 product, see WSO2 Administration Guide - Managing Datasources.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by MySQL table named RoomTable from the database cepdb located at localhost:3306 having user name "root" and password "root". 

@From(eventtable='rdbms', jdbc.url='jdbc:mysql://localhost:3306/cepdb', username='root', password='root', driver.name='com.mysql.jdbc.Driver', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string);

Caching events with RDBMS Event Table

Several caches can be used with RDMBS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation. 

The supported cache implementations are as follows;

  1. Basic: Events are cached in a FIFO manner where the oldest event will be dropped when the cache is full. 
  2. LRU (Least Recently Used): The least recently used event is dropped when the cache is full.
  3. LFU (Least Frequently Used): The least frequently used event is dropped when the cache is full.

In the "From" annotation, if the "cache" element is not specified the "Basic" cache will be assigned by default, and if the "cache.size" element is not assigned the default value 4096 will be assigned as the cache size.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table using least recently used caching algorithm for caching 3000 events. 

@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000')
define table RoomTypeTable (roomNo int, type string);

Insert into

Query for inserting events into table is similar to the query of inserting events into event streams, where we will be using "insert into <table name>" code snippet. To insert only the specified output event category use "current events", "expired events" or "all events" keywords between 'insert' and 'into' keywords. 

E.g. Insert all temperature events from TempStream to temperature table 

from TempStream
select *
insert into TempTable;

Delete

Query for deleting events on event table can be written using a delete query having following syntax

from <input stream name> 
select <attribute name>, <attribute name>, ...
delete <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for deletion, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

E.g. Delete the entries of the RoomTypeTable associated to the room numbers of DeleteStream.

define table RoomTypeTable (roomNo int, type string);
define stream DeleteStream (roomNumber int);
 
from DeleteStream 
delete RoomTypeTable
	on RoomTypeTable.roomNo == roomNumber;

To execute delete only for the specified output event category instead of  "delete <table name> on <condition>" code snippet use "delete <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

Update

Query for updating events on event table can be written using an update query having following syntax

from <input stream name> 
select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ...
update <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for update, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated on event table.

E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream.

define table RoomTypeTable (roomNo int, type string);
define stream UpdateStream (roomNumber int, roomType string);
 
from UpdateStream 
select roomType as type
delete RoomTypeTable
	on RoomTypeTable.roomNo == roomNumber;

To execute update only for the specified output event category instead of  "update <table name> on <condition>" code snippet use "update <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

Insert Overwrite

Query for insert or overwrite events on event table can be written using an insert-overwrite query having following syntax

from <input stream name> 
select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ...
insert overwrite <table name> 
	on <condition> 

Here the "on <condition>" can be used to select the events for update or insert, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated/inserted on event table.

E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream or insert if it is not exist.

define table RoomTypeTable (roomNo int, type string);
define stream UpdateStream (roomNumber int, roomType string);
 
from UpdateStream 
select roomNumber as roomNo, roomType as type
insert overwrite RoomTypeTable
	on RoomTypeTable.roomNo == roomNo;

In

Query for checking whether an attribute is in event table can be checked using conditions having the following syntax

 <condition> in <table name> 

Here the "<condition>" can be used to select the matching attribute, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the incoming stream should not be have reference associated with them. 

E.g. By checking ServerRoomTable output only the temperature events associated with the saver rooms.

define table ServerRoomTable (roomNo int);
define stream TempStream (deviceID long, roomNo int, temp double);
 
from TempStream[ServerRoomTable.roomNo == roomNo in ServerRoomTable] 
insert into ServerTempStream;

Join

A stream can be joined with event table and retrieve data from the event table. In oder to join a stream with an event table a simple join query could be used, and at join the event table should not be associated with window operations as event table is not an active construct. Because of  the same reason event table cannot be joined with another event table in Siddhi. 

E.g. Update the events in temperature stream with their room type based on the RoomTypeTable.

define table RoomTypeTable (roomNo int, type string);
define stream TempStream (deviceID long, roomNo int, temp double);
 
from TempStream join RoomTypeTable
	on RoomTypeTable.roomNo == TempStream.roomNo
select deviceID, RoomTypeTable.roomNo as roomNo, type, temp 
insert into EnhancedTempStream;

Event window

An event window is a window that can be shared across multiple queries. The events should be inserted from one or more streams. The event window publishes current and/or expired events as the output, and the time these events are published depends on the window type.

Event window definition

The syntax for an event window definition is as follows.

define window <event window name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... ) <window type>(<parameter>, <parameter>, …) <output event type>;

The above syntax contains the following:

ElementDescription
<event window name>
A unique name for the window.
<attribute name> <attribute type>
These elements define a set of attributes assigned specific types.

<window type>

Any inbuilt window type available in Siddhi. For the complete list of available window types, see Inbuilt Windows.

<output event type>

The possible values for this parameter are as follows:

  • output current events : Windows with this output event type emit only current events.
  • output expired events : Windows with this output event type emit only expired events.
  • output all events : Windows with this output event type emit both current and expired events.

If the output event type is not specified for an event window, it emits both current and expired events.

Sample event window definitions
  • The window type is not specified in the following window definition. Therefore, it emits both current and expired events as the output.

     define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second);
  • The window type of the following window is output all events. Therefore, it emits both current and expired events as the output.

    define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second) output all events;

Insert Into

The query for inserting events into a window is similar to the query for inserting events into event streams where the insert into <window name> code snippet is used. To restrict the events inserted into the window by a specific category, use one of the following key words between the insert and into keywords.

KeywordPurpose
current events To insert only current events into the event window.
expired events To insert only expired events into the event window.
all events To insert both current and expired events into the event window.

e.g., The following query inserts both current and expired events from an event stream named sensorStream to an event window named sensorWindow.

from SensorStream
insert into SensorWindow;

Output

An event window can be used as a stream in any query. However, an ordinary window cannot be applied to the output of an event window.

e.g., The following query selects the name and the maximum values for the value and roomNo attributes from an event window named SensorWindow, and inserts them into an event stream named MaxSensorReadingStream.

from SensorWindow
select name, max(value) as maxValue, roomNo
insert into MaxSensorReadingStream; 

Join

An event window can be joined with another event stream, an event table or an event window.

e.g., The following query sends an alert to an event stream named RegulatorActionStream if the temperature is greater than 30 and the regulator is off.

define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);
define window TempWindow(deviceID long, roomNo int, temp double) time(1 min);

from TempStream[temp > 30.0]
insert into TempWindow;

from TempWindow
join RegulatorStream[isOn == false]#window.length(1) as R
on TempWindow.roomNo == R.roomNo
select TempWindow.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

Event Trigger

Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long. 

Event Trigger Definition

Event Trigger Definition defines the event triggering interval following the below syntax.

define trigger <trigger name> at {'start'| every <time interval>| '<cron expression>'};

With "'start'" an event will be trigger at Siddhi start.

With "every <time interval>" an event will be triggered periodically on the given time interval.

With "'<cron expression>'" an event will be triggered periodically based on the given cron expression, refer quartz-scheduler for config details.

E.g  Trigger an event every 5 minutes.

define trigger FiveMinTriggerStream at every 5 min;

E.g  Trigger an event at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday.

define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI';

Siddhi Logger

The Siddhi logger is used to log events that arrive in different logger priorities such as INFODEBUGWARNFATALERROROFF, and TRACE.

The following syntax is used.

<void> log(<string> priority, <string> logMessage, <bool> isEventLogged)

The parameters used in the query are as follows.

ParameterDescription

priority

The logging priority. Possible values are INFODEBUGWARNFATALERROROFF, and TRACE. If no value is specified for this parameter, INFO is printed as the priority by default.
logMessage This parameter allows you to specify a message to be printed in the log.
isEventLogged This parameter specifies whether the event body should be included in the log. Possible values are true and false. If no value is specified, the event body is not printed in the log by default.

The following examples illustrate the variations of the Siddhi logger.

e.g.,

  • The following query logs the event with the INFO logging priority. This is because the priority is not specified.

    from StockStream#log()
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority (because the priority is not specified) and the test message text.

    from StockStream#log('test message')
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority because a priority is not specified. The event itself is printed in the log.

    from StockStream#log(true)
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority (because the priority is not specified) and the test message text. The event itself is printed in the log.

    from StockStream#log('test message', true)
    select * 
    insert into OutStream;
  • The following query logs the event with the WARN logging priority and the test message text.

    from StockStream#log('warn','test message')
    select * 
    insert into OutStream;
  • The following query logs the event with the WARN logging priority and the test message text.  The event itself is printed in the log.

    from StockStream#log('warn','test message',true)
    select * 
    insert into OutStream;

Siddhi Extensions

Siddhi support an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follows the the following syntax;

<namespace>:<function name>(<parameter1>, <parameter2>, ... )

Here the namespace will allow Siddhi to identify the function as an extension and its extension group, the function name will denote the extension function within the given group, and the parameters will be the inputs that can be passed to the extension for evaluation and/or configuration.  

E.g. A window extension created with namespace foo and function name unique can be referred as follows:

from StockExchangeStream[price >= 20]#window.foo:unique(symbol)
select symbol, price
insert into StockQuote 

Extension Types 

Siddhi supports following five type of extensions: 

Function Extension 

For each event it consumes zero or more parameters and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. Implemented by extending "org.wso2.siddhi.core.executor.function.FunctionExecutor".

E.g. "math:sin(x)" here the sin function of math extension will return the sin value its parameter x.

Aggregate Function Extension

For each event it consumes zero or more parameters and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator. Implemented by extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator".

E.g. "custom:std(x)" here the std aggregate function of custom extension will return the standard deviation of value x based on the assigned window to its query. 

Window Extension

Allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor".

E.g. "custom:unique(key)" here the unique window of custom extension will return all events as current events upon arrival as current events and when events arrive with the same value based on the "key" parameter the corresponding to a previous event arrived the previously arrived event will be emitted as expired event.

Stream Function Extension

Allows events to be altered by adding one or more attributes to it. Here events could be outputted upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor".

E.g. "custom:pol2cart(theta,rho)" here the pol2cart function of custom extension will return all events by calculating the cartesian coordinates x & y and adding them as new attributes to the existing events.

Stream Processor Extension

Allows events to be collected and expired with altering the event format based on the given input parameters. Implemented by extending "oorg.wso2.siddhi.core.query.processor.stream.StreamProcessor".

E.g. "custom:perMinResults(arg1, arg2, ...)" here the perMinResults function of custom extension will return all events by adding one or more attributes the the events based on the conversion logic and emitted as current events upon arrival as current events and when at expiration expired events could be emitted appropriate expiring events attribute values for matching the current events attributes counts and types.

Available Extensions 

Siddhi currently have several prewritten extensions as follows; 

Extensions released under Apache License v2 : 

  • math: Supporting mathematical operations 
  • str: Supporting String operations 
  • geo: Supporting geo coordinates
  • r: Supporting R executions
  • regex: Supporting regular expression operations

Extensions released under GNU/GPL License v3 : 

You can get them from https://github.com/wso2-gpl/siddhi

Writing Custom Extensions

Custom extensions can be written in order to cater usecase specific logics that are not out of the box available in Siddhi or as an extension. 

To create custom extensions two things need to be done. 

  1. Implementing the extension logic by extending well defined Siddhi interferes. E.g implementing a UniqueWindowProcessor by extending org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.

    package org.wso2.test;
     
    public class UniqueWindowProcessor extends WindowProcessor {
       ...
    }
  2. Add an extension mapping file to map the written extension class with the extension function name and namespace. Here extension mapping file should be named as "<namespace>.siddhiext". E.g Mapping the written UniqueWindowProcessor extension with function name "unique" and namespace "foo", to do so the mapping file should be named as foo.siddhiext and the context of the file should as below; 

    # function name to class mapping of 'foo' extension
    unique=org.wso2.test.UniqueWindowProcessor

Refer following for implementing different types of Siddhi extensions with examples

  • Function Extension 
  • Aggregate Function Extension 
  • Window Extension 
  • Stream Function Extension
  • Stream Processor Extension

 

---------------------

 

  • No labels