This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

In the previous two tutorials, you understood how event correlation is handled through joins as well as patterns. Now, let's look at how a more advanced form of event processing (namely trend analysis) can be carried out with Siddhi.

The factory foreman needs to detect all the input from the Sweet Bots to detect overall decreases in the production of a sweet. For example, if the production of any given sweet is showing a downward trend for 10 minutes, the factory manager needs to be alerted.

This tutorial covers the following Siddhi concepts:

  • Siddhi sequences
  • Siddhi partitions

Sequences are used in Siddhi for considering consecutive events for analysis. A sequence can be considered a special, more advanced form of Siddhi pattern where only the immediate subsequent event is analyzed. In contrast, a partition can be used to analyze multiple following events. Therefore, sequences use the , notation instead of -> during definition.

The following is an sample sequence definition that considers n sequential events from the stream1 stream.

from every e1=stream1, e2=stream1, e3=stream1, ... , en=stream1
select ...
insert into ...

Tutorial steps

Let's get started!

  1. First, let's add the SweetProductionStream input stream that you first created in Creating a Simple Siddhi Application to capture information from the Sweet Bots.

    define stream SweetProductionStream(name string, amount long);
  2. The performance analysis needs to be done based on time. Let's also assume that each Sweet Bot is sending events with time information, in the form of an additional field that denotes the UNIX. This additional field needs to be added as an attribute to the SweetProductionStream stream as shown below.

    define stream SweetProductionStream(name string, amount long, timestamp long);

    An example event (in JSON form) from a SweetBot is as follows:

    {
      "event": {
        "name": "Bonbon",
        "amount": 10,
        "timestamp": 1415463675
      }
    }
  3. Let's also define an output stream from which alerts are generated. The following information needs to be included in the output event.
    • The name of the sweet of which the production is showing a downward trend

    • Highest detected value within the 10-minute window (i.e. the initial amount based on which the checks are done)

    • The final amount detected

    • The timestamp at the end of processing

     The output stream definition with this information looks as follows.

    define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long, timestamp long);
  4. To enable the sequential processing of events, lets first define a simple sequence with no processing.

    from every e1=SweetProductionStream, e2=SweetProductionStream
    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
    insert into DecreasingTrendAlertStream;

    Here, you are using the comma (,) to delimit events that you need to process in sequence.

  5. Next, you need to define the logic for the user scenario. It requires detecting a decreasing trend within 10 minutes in the production figures. This can be deconstructed into the following points:
      1. Every event has a value for the amount parameter.
      2. If the amount value for event e1 is v1, then we can say that a decreasing trend exists if the next event e2 has a value v2 as the amount, and v1 > v2.
      3. The above decreasing trend has to continue for 10 minutes to be flagged for alerting.
      4. If no event with a decreasing trend (i.e., v1 < v2) is detected within 10 minutes, no alert is generated.

    Let's construct a query taking the above points into consideration.
    1. Taking the two sequential events e1 and e2 into consideration, point ii can be converted into a filter form as follows.
      [e1.amount > e2.amount]
    2. As mentioned in point iii, the processing should consider a period of 10 minutes (10 * 60000 milliseconds). The filter form to do this is as follows.
      [e2.timestamp - e2.timestamp < 10 * 60000]
    3. When the above two filter forms are combined, the following created.
      [e1.amount > e2.amount and (e2.timestamp - e1.timestamp) < 10 * 60000] 

    Let's apply this filter to the sequence, so that the above conditions are applied to the second event e2.

    from every e1=SweetProductionStream, 
    e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]
    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
    insert into DecreasingTrendAlertStream;

    Here, you are not referring to e2's attributes as e2.amount etc. You can refer to it as amount because it is self evident within the filter.

  6. For a complete trend analysis, the pattern matching must run continuously. When an event e2 that meets the conditions you specified arrives, you need to make sure that matching does not stop there and that the next event is also evaluated against the same criterion. In order to achieve this, sequences can be specified with regular expressions. This may include symbols such as * (0 or more instances), + (one or more instances) and ? (zero or one instance).

    In this scenario, you need the processing for the decreasing trend to be done continuously (for 0 or more matching events). Therefore, let's add the * regular expression to the filter as shown below.

    from every e1=SweetProductionStream, e2=SweetProductionStream
    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
    insert into DecreasingTrendAlertStream;
  7. Finally, you need the processing to conclude at the end of 10 minutes. This means, there should be a clause to check timestamps and match events th timestamps greater than 10 * 60000 of the original event. In other words, adding a filter to match timestamps greater than 10 minutes at the end ensures that the processing can break out of the loop described above and proceed to the output stream.

    Therefore, let's define a new event e3 that follows the initial event e1, and zero or more e2 events:

    e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]

    Let's add this to the query as follows.

    from every e1=SweetProductionStream,

    e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,

    e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]

    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp

    insert into DecreasingTrendAlertStream;


    The completed query is as follows.

    define stream SweetProductionStream(name string, amount long, timestamp long);
    
    define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long);
    
    from every e1=SweetProductionStream, 
    e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,
    e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
    insert into DecreasingTrendAlertStream;
  8. The Siddhi application still does not generate a product-wise decreasing trend alert. For example, the following two events arriving in succession must be considered a valid decreasing trend. 

    ProductAmountTimestamp
    Bonbon101415463675
    Pretzel71415474284

    To address this, you need to consider the processing of each sweet as a self-contained unit instead of considering them all together. This means, when processing events that contain the name attribute set to a particular sweet are only matched with other events with the same value for the attribute.

    Siddhi offers this functionality through partitions. With partitions, streams are virtually divided and incoming events are processed in isolated groups (known as partitions) that are completely independent from one another. Each partition (which can contain multiple operations within) is tagged with a key and only processes events that match their key. For more information about partitions, see Siddhi Query Guide - Partition.

    A typical partition definition looks as follows.

    partition with (myAttribute2 of MyStream )
    begin
        from MyStream
        select myAttribute1, myAttribute2, max(myAttribute3) as maxAttribute3
        insert into MyOutputStream;
    end;


    Here, the partition key is myAttribute2, and if there are three possible values for myAttribute2, each of these values are considered a separate partition.

    For this scenario, let's define a partition with the name attribute as follows so that each sweet group is considered separately based on its name.

    partition with (name of SweetProductionStream)
    begin
       . . .
    End;

    Let's copy the query with partitions you created above and place it within the partition as shown below.

    partition with (name of SweetProductionStream)
    begin
    from every e1=SweetProductionStream,

    e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,

    e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]

    select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp

    insert into DecreasingTrendAlertStream;
    end;

The completed Siddhi application looks as follows.

@App:name('DecreasingTrendAlertApp')

@source(type = 'http', @map(type = 'json'))
define stream SweetProductionStream(name string, amount long, timestamp long);

@sink(type='email', address='factory416@sweets-r-us.com', username='factory416', password='secret_password', subject='Downward production trend alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has been displaying a decreasing trend for the past 10 minutes. The initial detected amount was {{initialAmount}} and the amount at the end of processing was {{finalAmount}}.\n\nThis message was generated automatically.")))
define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long, timestamp long);

partition with (name of SweetProductionStream)
begin
	from every e1=SweetProductionStream, 
e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,
e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
insert into DecreasingTrendAlertStream;
end;
  • No labels