In the previous tutorial, you looked at correlating events from multiple sources in a simple manner using joins. You used join statements between streams and stores and observed how aliases can be used on streams for easy reference.
In this tutorial, let's see how event correlation can be applied to Complex Event Processing (CEP) scenarios.
Let's consider the following two scenarios in the sweet factory.
- Scenario 1: The factory foreman needs to monitor the supply of materials and send an alert if there is a decrease of 10 units within 10 minutes.
- Scenario 2: The factory foreman needs to monitor both supply and production, and send an alert if production does not start within 15 minutes after the raw materials are received.
WSO2 Siddhi also supports another type of patterns known as Counting Patterns. For more information, see the Siddhi Query Guide - Counting Pattern.
Before you begin:
Simple Siddhi patterns can be identified by the following syntax.
This means that for events in the
x stream are followed by events from the
y stream within a time gap of
t, the subsequent operations are carried out.
Let's consider the following real world example to understand this.
In the above example, each event that arrives at the
MyStream stream is checked against subsequent events arriving at the same stream (because the
every keyword is used) and checked against new events matching the given filter condition. This is done continuously for a time period of one hour. If any matching events are found, then they are sent to the
OutStream output stream based on the
every keyword is not used, then the calculation is invoked only once, when the first event arrives at the
This covers the steps for the following two user scenarios.
User Scenario 1: Detecting a decrease in supply within a set time period
Lets get started!
Let's define an input stream for the raw material supply. This can be the same input stream you defined in Correlating Simple Events.
Now let's define an output stream as follows.
Let's add a query to perform a simple insertion from the input stream to the output stream.
To define a pattern to this stream so that events within a 10-minute window are considered, lets update the query as follows.
The foreman requires each event be checked against the subsequent events to see if there's a decrease in the amount by 10 units. The Siddhi filter for this is as follows.
[e1.name == e2.name and e1.amount - e2.amount > 10]
Let's add it to the query as shown below.
To generate a more complete output, you need to add the following information to it.
The name of the raw material
The earlier amount
The later amount (which is less than the earlier detected amount by 10 units or more)
The supplier whose supply has fallen below the threshold
Let's update the output stream definition as follows to include this information.
The completed Siddhi application is as follows.
User Scenario 2: Detecting production delays after supplies are received
For this user scenario, let's consider a situation where events from 2 different streams contain data. Here, you need to identify delays in production, which means you have to check for events not arriving at a particular stream.
Let's begin by defining input streams for both the raw material supply and the sweet production.
- The information to be output is as follows.
Name of the raw material
To generate this output, let's define an output stream as follows.
To correlate events from the consumption and supply streams, let's define a simple pattern as follows.
- The above query you added does not currently contain a condition based on which the correlation is done. The condition to be added needs to consider events that have not arrived at the
MaterialSupplyStreaminput stream instead of the new events that arrive there. To do this, you can use the logical
NOToperator as a part of a logical pattern specification as shown below.
The pattern you defined here is different to the pattern defined in User Scenario 1 in the following ways.
In this scenario, you are not defining a stream reference for the
MaterialConsumptionStreamstream (i.e. similar to
e2=MaterialConsumptionStream). This is because the criterion to be met is the non-arrival of events. Therefore, you cannot check for event e2 in the non-arrival stream.
Instead of the
withinkeyword, you are using the
forkeyword. This is because the
notpattern has to be terminated either by a single
andclause (which denotes an event arriving at a different stream can terminate the clause), or a
for <time>clause (which denotes that the wait time for events not arriving is
<time>). More details, see the
notpattern in Siddhi Query Guide - Logical Patterns.
The completed Siddhi application looks as follows.