This documentation is for WSO2 CEP 4.0.0. View the home page of the latest release.

||
Skip to end of metadata
Go to start of metadata

This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 CEP using examples.

Introduction to Siddhi Query Language

Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. The following table provides definitions of a few terms in the Siddhi Query Language.

TermDefinition
Event StreamA logical series of events ordered in time.
Event Stream DefinitionThis defines event streams. An event stream has a unique name and a set of attributes assigned specific types, with uniquely identifiable names defining its schema.
EventAn event is associated with only one event stream, and all events of that stream have an identical set of attributes assigned specific types (or the same schema). An event contains a timestamp and the attribute values according to the schema.
AttributeAn attribute has a unique name within the event stream. The attribute type can be string, int, long, float, double, bool or object.
QueryA logical construct that derives new streams by combining existing streams. A query contains one or more input streams, handlers to modify those input streams, and an output stream to which it publishes its output events.
PartitionA logical container that processes a subset of the queries based on a pre-defined rule of separation.
Event TableA structured representation of stored data, allowing stored data to be accessed and manipulated at runtime.

Siddhi has the following language constructs:

  • Event Stream Definitions
  • Event Table Definitions
  • Partitions
  • Queries

The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ; ). 

Event Stream

An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.

A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses. 

Event Stream Definition

The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.

E.g. A stream named TempStream can be created with the following attributes as shown below. 

Attribute NameAttribute Type
deviceIDlong
roomNoint
tempdouble

Query

Each Siddhi query can consume one or more event streams and create a new event stream from them.

All queries contain an input section and an output section. Some also contain a projection section. A simple query with all three sections is as follows.

e.g., If you want to derive only the room temperature from the TempStream event stream defined above, a query can be defined as follows.

Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e.  RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.  

Query Projection 

SiddhiQL supports the following for query projection.

ActionDescription
Selecting required objects for projection

This involves selecting only some of the attributes in an input stream to be inserted into an output stream.

e.g.,The following query selects only the  roomNo and temp attributes from the TempStream stream.

Selecting all attributes for projection

This involves selecting all the attributes in an input stream to be inserted into an output stream. This can be done by using the asterisk sign ( * ) or by omitting the select statement.

e.g., Use one of the following queries to select all the attributes in the TempStream stream.

or

Renaming attributes

This involves selecting attributes from the input streams and inserting them into the output stream with different names.

e.g., The following query renames roomNo to roomNumber and temp to temperature

Introducing the default value

This involves adding a default value and assigning it to an attribute using as.

e.g.,

Using mathematical and logical expressions

This involves using attributes with mathematical and logical expressions to the precedence order given below, and assigning them to the output attribute using as.

Operator precedence

OperatorDistributionExample
 () Scope
 IS NULL Null check
 NOT Logical NOT
 *   /   %  Multiplication, division, modulo
 +   -  Addition, substraction
 <   <=   >   >=Comparisons: less-than, greater-than-equal, greater-than, less-than-equal
 ==   !=  Comparisons: equal, not equal
INContains in table
ANDLogical AND
ORLogical OR

e.g., Converting Celsius to Fahrenheit and identifying server rooms


 

Functions

A function consumes zero, one or more function parameters and produces a result value.

Function parameters 

Functions parameters can be attributes ( int, long, float, double, string, bool, object), results of other functions, results of mathematical or logical expressions or time parameters. 

Time is a special parameter that can we defined using the time value as int and its unit type as <int> <unit>. Following are the supported unit types, Time upon execution will return its expression in the scale of milliseconds as a long value. 

UnitSyntax
Yearyear | years
Monthmonth | months
Weekweek | weeks
Dayday | days
Hourhour | hours
Minutesminute | minutes | min
Secondssecond | seconds | sec
Millisecondsmillisecond | milliseconds

E.g. Passing 1 hour and 25 minutes to test function. 

Functions, mathematical expressions, and logical expressions can be used in a nested manner. 

Inbuilt Functions

Siddhi supports the following inbuilt functions.

E.g. With convert and UUID function, converting room number to string and introducing message ID to each event.

Filter

Filters can be used with input streams to filter events based on the given filter condition. Filter condition should be defined in square brackets next to the input stream name.

E.g. Filtering all server rooms having temperature greater then 40 degrees.

Window

Windows allows to capture a subset of events based on a criteria from input event stream for calculation, they can be defined next to input streams using '#window.' prefix and each input stream can only have maximum of one window as follows.

Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event whenever an event in a window expires based on that window criteria.

Inbuilt Windows

Siddhi supports the following inbuilt windows.

Output Event Categories

Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output. 

  • current events : Will emit all the events that arrives to the window. This is the default functionality is no event category is specified.
  • expired events : Will emit all the events that expires from the window. 
  • all events :  Will emit all the events that arrives and expires from the window. 

For using with insert into statement use the above keywords between 'insert' and 'into' as given in the example below. 

 E.g. Delay all events in a stream by 1 minute. 

Aggregate Functions

Aggregate functions can be used with windows to perform aggregate calculations within the defined window. 

Inbuilt Aggregate Functions

Siddhi supports the following inbuilt aggregate functions.

E.g. Notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes. 

Group By

Group by allows us to group the aggregation based on group by attributes. 

E.g. Find the average temperature per room and device ID for the last 10 min.

Having

Having allows us to filter events after aggregation and after processing at the selector. 

E.g. Find the average temperature per room for the last 10 min and alert if it's more than 30 degrees.

Output Rate Limiting

Output rate limiting allows queries to emit events periodically based on the condition specified.

 Rate limiting follows the below syntax.

With "<output-type>" the number of events that need to be emitted can be specified, "first", "last" and "all" are possible key wards that can be specified to emit only the first event, last event, or all events from the arrived events. If the key word is omitted it will default to "all" emitting all events.

With  "<time interval>" the time interval for the periodic event emotion can be specified. 

With  "<event interval>" the number of event need to be arrived for the periodic event emotion can be specified. 

Based on number of events

Here the events will be emitted every time when the predefined number of events have arrived, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

e.g., Emit the last temperature event per sensor every 10 events    

Based on time

Here the events will be emitted for every predefined time interval, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.

E.g. Emit the all temperature events every 10 seconds     

Periodic snapshot

This works best with windows, when the input stream as a window attached snapshot rate limiting will emit all current events arrived so far which does not have corresponding expired events for every predefined time interval, at the same time when no window is attached to the input stream it will only emit the last current event for every predefined time interval. 

E.g. Emit snapshot of the events in time window of 5 seconds every one second.     

Joins

Join allows two event streams to be merged based on a condition. Here each stream should be associated with a window (if there are no window assigned #window.length(0) with be assigned to the input event stream). During the joining process each incoming event on each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated. 

The syntax of join looks like below. 

With "on <join condition>” Siddhi joins only the events that matches the condition.

With "unidirectional" keyword the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when unidirectional keyword is used on an input stream only the events arriving on that stream will trigger the joining process. Note we cannot use unidirectional keyword for both the input streams (as thats equal to the default behaviour, which is not using the unidirectional keyword at all).

With "within <time gap>" the joining process matched the events that are within defined time gap of each other. 

When projecting the join events the attributes of each stream need to be referred with the stream name  (E.g. <stream name>.<attribute name>) or with its reference Id (specially when events of same streams are joined) (E.g. <stream reference Id>.<attribute name>), "select *" can be used or "select" statement itself can be omitted if all attributes of the joined events need to be projected, but these can only be used when both streams does not have any attributes with same names. 

e.g., Assume that the temperature regulators are updated every minute. The following switches on the temperature regulators if they are not already on for all the rooms that have a room temperature greater than 30 degrees.   

Pattern

Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival. With pattern there can be other events in between the events that match the pattern condition. It will internally create state machines to track the states of the matching process. Pattern can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of pattern looks like below. 

Input Streams cannot be associated with a window.

With "->" we can correlate incoming events arrivals, having zero or many other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the pattern will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword appropriately to trigger a pattern matching process upon event arrival.  

E.g. Alert if temperature of a room increases by 5 degrees within 10 min.  

Logical Pattern 

Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "->" to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Alert when the room temperature reaches the temperature set on the regulator, (the pattern matching should be reseted whenever the temperature set on the regulator changes).  

Counting Pattern 

Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.

With <1:4> matches 1 to 4 events

With <2:> matches 2 or more events and with <:5> up to 5 events.

With <5> matches exactly 5 events.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Get the temperature difference between two regulator events.

Sequence

Sequence allows event streams to be correlated over time and detect event sequences based on the order of event arrival. With sequence there can not be other events in between the events that match the sequence condition. It will internally create state machines to track the states of the matching process. Sequence can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.

The syntax of sequence looks like below. 

Input Streams cannot be associated with a window.

With "," we can correlate immediate next incoming events arrivals, having no other events arrived in between the matching events. 

With "<input event reference>=” the matched event can be stored for future reference.  

With "within <time gap>" the sequence will be only matched with the events that are within defined time gap of each other.

Without "every" keyword the pattern can be match only once, use the "every" keyword in the beginning to trigger a sequence matching process upon every event arrival.  

E.g. Alert if there is more than 1 degree increase in temperature between two consecutive temperature events.  

Logical Sequence 

Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships. 

Keywords like "and" and "or" can be used interred of "," to illustrate the logical relationship.

With "and" occurrence of two events in any order can be matched

With "or" occurrence of an event from either of the input steams in any order can be matched

E.g. Notify when a regulator event is followed by both the temperature and humidity events.  

Counting Sequence 

Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.

With "*" zero or more events can be matched.

With "+" one or more events can be matched.

With "?" zero or one events can be matched.

To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.  

E.g Identify peak temperatures.

Partition

With partition Siddhi can divide both incoming events & queries and process them parallel in isolation. Each partition will be tagged with a partition key and only the events corresponding to the given partition key will be processed at that partition. Each partition will have separate instances of Siddhi queries providing isolation of processing states. Partition can contain more than one query.

Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.

Variable Partition 

Partition using categorical (string) attributes will adhere to the following syntax.

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events each sensor has emitted.

Range Partition 

Partition using numerical attributes will adhere to the following syntax.

E.g. Per office area calculate the average temperature over last 10 minutes.

Inner Streams 

Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block. 

E.g. Per sensor, calculate the maximum temperature over last 10 temperature events when the sensor is having an average temperature greater than 20 over the last minute.

Event Table

Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDMS data stores.

Event Table Definition

Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table. 

With the above definition events will store all events in-memory via a linked list data structure.

E.g. Room type table with name RoomTypeTable can be created as below with attributes room number as int and type as string. 

Indexing Event Table

Event table can be index for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.

E.g. An indexed room type table with attribute room number can be created as bellow with name RoomTypeTable and attributes room number as int & type as string. 

RDBMS Event Table

Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source and the connection instructions can be assign to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.

RDBMS event table has been tested with the following databases: 

    • MySQL

    • H2

    • Oracle

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table named RoomTable from the data source named AnalyticsDataSource. 

Note

The datasource.name given here is injected to the Siddhi engine by the CEP server. To configure data sources in the CEP, see  Datasources in the Admin Guide.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by MySQL table named RoomTable from the database cepdb located at localhost:3306 having user name "root" and password "root". 

Caching events with RDBMS Event Table

Several caches can be used with RDMBS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation. 

The supported cache implementations are as follows;

  1. Basic: Events are cached in a FIFO manner where the oldest event will be dropped when the cache is full. 
  2. LRU (Least Recently Used): The least recently used event is dropped when the cache is full.
  3. LFU (Least Frequently Used): The least frequently used event is dropped when the cache is full.

In the "From" annotation, if the "cache" element is not specified the "Basic" cache will be assigned by default, and if the "cache.size" element is not assigned the default value 4096 will be assigned as the cache size.

E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table using least recently used caching algorithm for caching 3000 events. 

Using Bloom Filters

A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable checks.

The following example shows how to include Bloom Filters in an event table update query.

For more information about In-memory event tables, see Sample 0106 - Using in-memory event tables.

For more information about RDBMS event tables, see Sample 0107 - Using RDBMS event tables.

Insert into

Query for inserting events into table is similar to the query of inserting events into event streams, where we will be using "insert into <table name>" code snippet. To insert only the specified output event category use "current events", "expired events" or "all events" keywords between 'insert' and 'into' keywords. 

E.g. Insert all temperature events from TempStream to temperature table 

Delete

Query for deleting events on event table can be written using a delete query having following syntax

Here the "on <condition>" can be used to select the events for deletion, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

E.g. Delete the entries of the RoomTypeTable associated to the room numbers of DeleteStream.

To execute delete only for the specified output event category instead of  "delete <table name> on <condition>" code snippet use "delete <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

Update

Query for updating events on event table can be written using an update query having following syntax

Here the "on <condition>" can be used to select the events for update, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them. 

With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated on event table.

E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream.

To execute update only for the specified output event category instead of  "update <table name> on <condition>" code snippet use "update <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords. 

In

Query for checking whether an attribute is in event table can be checked using conditions having the following syntax

Here the "<condition>" can be used to select the matching attribute, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the incoming stream should not be have reference associated with them. 

E.g. By checking ServerRoomTable output only the temperature events associated with the saver rooms.

Join

A stream can be joined with event table and retrieve data from the event table. In oder to join a stream with an event table a simple join query could be used, and at join the event table should not be associated with window operations as event table is not an active construct. Because of  the same reason event table cannot be joined with another event table in Siddhi. 

E.g. Update the events in temperature stream with their room type based on the RoomTypeTable.

Event Trigger

Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long. 

Event Trigger Definition

Event Trigger Definition defines the event triggering interval following the below syntax.

With "'start'" an event will be trigger at Siddhi start.

With "every <time interval>" an event will be triggered periodically on the given time interval.

With "'<cron expression>'" an event will be triggered periodically based on the given cron expression, refer quartz-scheduler for config details.

E.g  Trigger an event every 5 minutes.

E.g  Trigger an event at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday.

Eval Script

Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.

Eval Script Definition

Eval Script Definition defines the function operation following the below syntax.

With "<function name>" a function will be defined to be using in the queries. Note this function will overwrite the Inbuilt Functions.

With "<language name>" the execution language will be defined e.g. JavaScript, R, Scala.

With "<return type>" the return type of the function is defined, it can either be int, long, float, double, string, bool or object. Here the function implementer should be responsible for returning the output on the defined <return type> for proper functionality. 

With "<operation of the function>" the execution logic of the function should be written on the defined language defined in <language name> returning <return type>. 

Supported Eval Script Languages  

  • JavaScript
  • R
  • Scala 

JavaScript

E.g  Concatenating function in JavaScript.

R

E.g  Concatenating function in R.

Scala

E.g  Concatenating function in Scala.

Siddhi Extensions

Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;

Here the namespace will allow Siddhi to identify the function as an extension and its extension group, the function name will denote the extension function within the given group, and the parameters will be the inputs that can be passed to the extension for evaluation and/or configuration.  

E.g. A window extension created with namespace foo and function name unique can be referred as follows:

Extension Types 

Siddhi supports following five type of extensions: 

Function Extension 

For each event it consumes zero or more parameters and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. Implemented by extending "org.wso2.siddhi.core.executor.function.FunctionExecutor".

E.g. "math:sin(x)" here the sin function of math extension will return the sin value its parameter x.

Aggregate Function Extension

For each event it consumes zero or more parameters and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator. Implemented by extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator".

E.g. "custom:std(x)" here the std aggregate function of custom extension will return the standard deviation of value x based on the assigned window to its query. 

Window Extension

Allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor".

E.g. "custom:unique(key)" here the unique window of custom extension will return all events as current events upon arrival as current events and when events arrive with the same value based on the "key" parameter the corresponding to a previous event arrived the previously arrived event will be emitted as expired event.

Stream Function Extension

Allows events to be altered by adding one or more attributes to it. Here events could be outputted upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor".

E.g. "custom:pol2cart(theta,rho)" here the pol2cart function of custom extension will return all events by calculating the cartesian coordinates x & y and adding them as new attributes to the existing events.

Stream Processor Extension

Allows events to be collected and expired with altering the event format based on the given input parameters. Implemented by extending "oorg.wso2.siddhi.core.query.processor.stream.StreamProcessor".

E.g. "custom:perMinResults(arg1, arg2, ...)" here the perMinResults function of custom extension will return all events by adding one or more attributes the events based on the conversion logic and emitted as current events upon arrival as current events and when at expiration expired events could be emitted appropriate expiring events attribute values for matching the current events attributes counts and types.

Available Extensions 

Siddhi currently have several prewritten extensions as follows; 

Extensions released under Apache License v2 : 

  • math : Supporting mathematical operations 
  • str : Supporting String operations 
  • geo : Supporting geocode operations
  • regex : Supporting regular expression operations
  • time : Supporting time expression operations
  • ml : Supporting Machine Learning expression operations
  • timeseries : Supporting Time Series operations

Extensions released under GNU/GPL License v3 : 

  • geo : Supporting geographical processing operations   
  • r :  Supporting R executions
  • nlp : Supporting Natural Language Processing expression operations
  • pmml : Supporting Predictive Model Markup Language expression operations

You can get them from https://github.com/wso2-gpl/siddhi

Writing Custom Extensions

Custom extensions can be written in order to cater usecase specific logics that are not out of the box available in Siddhi or as an extension. 

To create custom extensions two things need to be done. 

  1. Implementing the extension logic by extending well defined Siddhi interfaces. E.g implementing a UniqueWindowProcessor by extending org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.

  2. Add an extension mapping file to map the written extension class with the extension function name and namespace. Here extension mapping file should be named as "<namespace>.siddhiext". E.g Mapping the written UniqueWindowProcessor extension with function name "unique" and namespace "foo", to do so the mapping file should be named as foo.siddhiext and the context of the file should as below; 

Refer following for implementing different types of Siddhi extensions with examples

  • No labels