This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Incremental aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.

This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and further processing. Its schema is defined via the aggregation definition. 

This section explains how to calculate aggregates in an incremental manner for different sets of time periods, store the calculated values in a data store and then retrieve that information.

Prerequisites

In order to define a query to calculate aggregate values, a pre-defined stream must exist.

Step 1: Calculate and persist time-based aggregate values

In this example, the Siddhi application defines an aggregation named TradeAggregation to calculate the average and sum for the price and quantity attributes respectively, of events arriving at the TradeStream stream. These aggregates are calculated per second, minute, hour, day, month and year durations as specified by sec...year (If minute...day was given, aggregation would be done for minute, hour, day. Giving comma separated values as every day, month is also supported. However, as per the existing implementation, we are not allowed to skip durations for comma separated values. e.g. every sec, min, day is not a valid clause since the hour duration has been skipped) Furthermore, the aggregate calculations would be done per each symbol value due to the presence of 'group by' clause.

define stream TradeStream (symbol string, price double, quantity long, timestamp long);

@BufferSize('3')
@IgnoreEventsOlderThanBuffer('true')
@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
define aggregation TradeAggregation
from TradeStream
select symbol, avg(price) as avgPrice, sum(quantity) as total
group by symbol
aggregate by timestamp every sec ... year;

In the given example, aggregation would be executed based on an external timestamp, which is reflected by the value of the 'timestamp' attribute (i.e. the event's time is determined by the 'timestamp' attribute's value). This attribute's value could either be a long value (reflecting Unix timestamp in milliseconds) or a string value adhering to one of the following formats.

  • If a timezone needs to be given explicitly :  <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z> (Here the ISO 8601 UTC offset must be provided for <Z> .For example +05:30 reflects the India Time Zone. If time is not in GMT this value must be provided)
  • If timezone is GMT: <yyyy>-<MM>-<dd> <HH>:<mm>:<ss>

Aggregation can also be done based on the event arrival time. To achieve this, the last line of the given example must be changed as 'aggregate every sec ... year;'

The group by clause is optional for incremental aggregation. When the group by clause is not given, all the events would be aggregated together, for each duration.

Following annotations are supported for incremental aggregation. All of these are optional. Hence, an aggregation can be defined without any of these annotations.

AnnotationDescription
@BufferSize(<positive integer>)

The buffer size is a positive integer, which specifies how many older aggregations must be retained in an in-memory buffer, to handle out of order data. Default buffer size is 0. It can only be specified, if aggregation is based on external timestamp only (Since data being processed on event arrival time can never be out of order). Furthermore, if a buffer size is given, the most granular time unit for which aggregation is done must be seconds, minutes, hours or days.

Buffering is done only for the most granular duration for which aggregation is done. In the given example, buffering is done for the second duration. In that, 3 older aggregations would be retained in an in-memory buffer. Whether an aggregation is 'old' or not is determined based on the latest event which has arrived so far.

For example, consider the following sequence of events (i.e. 51 refers to an event belonging to the 51st second)

event_1: 50 → event_2: 51 → event_3: 52 → event_4: 53 → event_5: 51 → event_6: 53 → event_7: 54 → event_8: 50

Note that event_5 is an out of order event. The latest event which has arrived so far (until event_5) is event_4, which belongs to the 53rd second. Since the buffer size is 3, three aggregations older than aggregation for the 53rd second would be buffered. Hence, aggregations for the 50th sec, 51st sec, and 52nd sec is buffered. Therefore, event_5 would be properly aggregated together with event_2. event_6 too would be properly aggregated with event_4. When event_7 arrives for the 54th second, that becomes the latest event. Hence, aggregation for the 50th second now expires from the buffer. Therefore when event_8 arrives, there's no corresponding aggregation for 50th second to process it with. How event_8 is aggregated is determined by the value of @IgnoreEventsOlderThanBuffer annotation. Please refer to that description for further information.

@IgnoreEventsOlderThanBuffer(<true or false>)

The default value of this annotation is 'false'. @IgnoreEventsOlderThanBuffer value can be assigned whether or not a @BufferSize is provided. If @IgnoreEventsOlderThanBuffer is not specified or if the value is given as 'false', any out of order event, which happens to be 'older' than the buffer would be aggregated with the oldest event in the buffer. If the buffer size is not defined (then buffer size is 0), the out of order event would be processed with the latest aggregation done for the most granular duration.

If @IgnoreEventsOlderThanBuffer value is given as 'true', any out of order event which happens to be outside the buffer would be ignored. When buffer size is not specified, any out of order event would be dropped.

If the example sequence of events for @BufferSize description is considered, event_8 would be processed as follows when @IgnoreEventsOlderThanBuffer value is false and true respectively.

@IgnoreEventsOlderThanBuffer('false') or @IgnoreEventsOlderThanBuffer is not given: event_8 would be processed with aggregation for the 51st second, as that is the oldest aggregation retained in the buffer at that time.

@IgnoreEventsOlderThanBuffer('true'): event_8 would be dropped.

@store(type=<store type>, ...)

The aggregated results would be stored in the database defined by @store definition. For more information on how to define a store, please refer to 'Configuring Event Tables to Store Data' section (Please note that @PrimaryKey and @IndexBy annotations are not allowed with aggregation definition, as a primary key internally gets defined for this).

When a store is defined, tables corresponding to each aggregate duration would get created in the given database. If a store is not defined, all of these tables would be in-memory tables by default. Giving a store definition is useful in a production environment as otherwise, all aggregations would be lost in the case of a system failure. Names of the tables created would be of the format <aggregation name>_<duration>.

The following tables get created in the given example:

  • TradeAggregation_SECONDS
  • TradeAggregation_MINUTES
  • TradeAggregation_HOURS
  • TradeAggregation_DAYS
  • TradeAggregation_MONTHS
  • TradeAggregation_YEARS

Internally, the primary key of these tables would be set to AGG_TIMESTAMP attribute. If aggregation has the group by clause, a composite primary key of AGG_TIMESTAMP and group by key(s) is defined. AGG_TIMESTAMP is an internally calculated value, reflecting the time bucket of an aggregation for a particular duration. For example, for the 'day' aggregations, AGG_TIMESTAMP = 1515110400000 reflects that it's the aggregation for the 5th of January 2018 (1515110400000 is the Unix time for 2018-01-05 00:00:00).

AGG_TIMEZONE value is also stored in these tables, reflecting the timezone of the AGG_TIMESTAMP value. For aggregation based on external timestamp, a timezone could be sent with the event. For event arrival time-based aggregation, timezone would be the system timezone.

The other values stored in the table would be aggregations and other function calculations done in the aggregate definition. (If it's a function calculation that is not an aggregation, the output value would correspond to the function calculation for the latest event for that time bucket. e.g. If a multiplication is done, the multiplication value would correspond to the latest event's multiplication as per the duration). Please note that certain aggregations are internally stored as a collection of other aggregations. For example, the average aggregation is internally calculated as a function of sum and count. Hence, the table would only reflect a sum and a count. The actual average would be returned when user retrieves the aggregate values as described in 'Step 2'




When providing aggregation names, please be mindful of the name length, since for certain databases such as Oracle, table name length limitations exist (e.g. For Oracle it's 30 characters). Hence, the table name (<Aggregate name>_<Duration>) length must not exceed the maximum character length specified for the relevant database.





Step 2: Retrieve calculated and persisted aggregate values

In this query, we are retrieving the aggregate values that you calculated and persisted in Step 1.

This query matches events in the TradeSummaryRetrievalStream stream and the TradeAggregation aggregation that was defined in step 1 based on the value for the symbol attribute, and performs a join for each matching pair. Based on that join, it produces an output event(s) with the symbol, total and avgPrice attributes for day within the time range 2014-02-15 00:00:00 to 2014-03-16 00:00:00. The timezone is represented by +05:30. The output would contain total and avgPrice per Symbol, for all the days falling within the given time range.


define stream TradeSummaryRetrievalStream (symbol string);

from TradeSummaryRetrievalStream as b join TradeAggregation as a
on a.symbol == b.symbol 
within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30" 
per "days" 
select a.symbol, a.total, a.avgPrice 
insert into TradeSummaryStream;

The on condition is optional for retrieval. 

The within duration could be specified in the following formats:

  1. within <start_time>, <end_time>: The <start_time> and <end_time> could either be string or long values. If it is a string value, the format <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z> must be adhered to (<Z> represents the timezone. It could be ommitted if time is in GMT).  If long values are given, those must reflect Unix timestamps (in milliseconds). 
  2. within <within_duration>: The <within_duration> can only be a string value. The format of it could be one of the following 
    1. <yyyy>-**-** **:**:** <Z>
    2. <yyyy>-<MM>-** **:**:** <Z>
    3. <yyyy>-<MM>-<dd> **:**:** <Z>
    4. <yyyy>-<MM>-<dd> <HH>:**:** <Z>
    5. <yyyy>-<MM>-<dd> <HH>:<mm>:** <Z>
    6. <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>
    The timezone <Z> can be ommitted if time is in GMT. 
    Consider the clause, within "2018-01-** **:**:**" as an example. This means, within the 1st month of 2018. It's equivalent to the clause within "2018-01-01 00:00:00", "2018-02-01 00:00:00"

The 'per' clause defines for which time granularity, aggregations need to be retrieved. The 'per' value could be seconds, minutes, hours, days, months or years. Furthermore, this 'per' value must be a duration for which aggregation is already done as per the aggregate definition. For example, one is not allowed to have an aggregate by timestamp every sec ... day in the aggregate definition, and then attempt to retrieve per 'months' since month aggregation has not been done. 

You can retrieve the same resultant aggregate value by executing the following cURL command.

curl -X POST https://localhost:9443/stores/query -H "content-type: application/json" -u "admin:admin" 
-d '{"appName" : "<<Application Name>>", "query" : "<<query>>" }' -k
  • <<Application Name>> : This defines the application name, e.g., AggregationTest.
  • <<query>> : This defines the query that retrieves the aggregate value.

    Example
    from TradeAggregation as a 
        on a.symbol == "FB"
    within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
    per "days"

For more information, see Managing Stored Data via REST API.

Apart from providing these within and per values as constants in the query itself, the user has the capability of retrieving those values from attributes in the incoming stream. An example is as follows.

define stream TradeSummaryRetrievalStream (symbol string, startTime long, endTime long, perDuration string);

from TradeSummaryRetrievalStream as b join TradeAggregation as a
on a.symbol == b.symbol 
within b.startTime, b.endTime 
per b.perDuration 
select a.symbol, a.total, a.avgPrice 
insert into TradeSummaryStream;

Ultimately, the user would be produced with the aggregate values for given 'per' time granularity, which happens to be inside the duration specified by the 'within' clause. 

  • Please note that we can include the AGG_TIMESTAMP attribute in the select clause. If AGG_TIMESTAMP is in the select statement, the order by AGG_TIMESTAMP clause could be used if the user needs to get the final aggregations in ascending order, based on time. Such a query would look as follows

    from TradeSummaryRetrievalStream as b join TradeAggregation as a
    on a.symbol == b.symbol 
    within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30" 
    per "days" 
    select AGG_TIMESTAMP, a.symbol, a.total, a.avgPrice 
    order by AGG_TIMESTAMP 
    insert into TradeSummaryStream;



Incremental Aggregation Behavior in Single-node Deployment and HA Deployment

Prior to describing how Incremental Aggregation behaves in a single node and HA setup, it would be worthy to know how the aggregation is executed internally.

Consider an instance, where aggregation is done for second, minute and hour durations. Assume that events are arriving in the following sequence, for the given timestamp

  • event 0 → 2018-01-01 05:59:58
  • event 1 → 2018-01-01 05:59:58
  • event 2 → 2018-01-01 05:59:59
  • event 3 → 2018-01-01 06:00:00
  • event 4 → 2018-01-01 06:00:01
  • event 5 → 2018-01-01 06:00:02

Consider that no buffer has been defined (Hence buffer size is 0). When event 0 arrives, it would be initially processed at the second level. This aggregation would be retained in-memory, until the 58th second elapses. When event 1 arrives, since that too belongs to the 58th second (same as event 0), event 0 and event 1 data would be aggregated together. When event 2 arrives, we would know that the 58th second has now elapsed (since event 2 has the timestamp for the 59th second). Hence the aggregation for the 58th second (that is the total aggregation for event 0 and event 1) would be written to the <Aggregate name>_SECONDS table, and then forwarded to be processed at the minute level. At this phase, 59th second's data (event 2) would be processing in-memory at the second executor level. Aggregation which was forwarded would be processing in-memory for the 59th minute at minute executor level. When event 3 arrives, the 59th second's aggregation (event 2) would get expired and get written to <Aggregate name>_SECONDS table. It would also be forwarded to the minute executor. At the minute executor level, this forwarded aggregation (corresponding to event 2) would be aggregated in-memory, for the 59th minute. Effectively, the running in-memory aggregation at the minute level would have an aggregation for events 0, 1 and 2 at this phase. Once event 4 arrives, aggregation for timestamp 2018-01-01 06:00:00 (event 3) would be written to Aggregate name>_SECONDS table and forwarded to the minute executor. With the arrival of event 3 at the minute executor, it would identify that the 59th minute has now elapsed. Hence the aggregation for the 59th minute would be written to  <Aggregate name>_MINUTES table and then forwarded to the hour executor. The hour executor would process this forwarded aggregation for the 5th hour. At this stage, event 4 would be processing in-memory at the second level; aggregation for event 3 would be processing in-memory at minute level; aggregation for the 5th hour would be processing in-memory at hour level. When event 5 arrives, aggregation for event 4 would be forwarded to the minute window. Hence event 3 and 4 would be aggregated together since they belong to the same minute (0th minute of the 6th hour). Event 5 would be processed in-memory at the second level.

Hence the aggregation status would be as follows after all 5 events have arrived.

  1. Second level aggregation
    1. <Aggregate name>_SECONDS table
      1. Aggregation for 2018-01-01 05:59:58 (Aggregation of event 0 and 1)
      2. Aggregation for  2018-01-01 05:59:59 (event 2)
      3. Aggregation for 2018-01-01 06:00:00 (event 3)
      4. Aggregation for  2018-01-01 06:00:01 (event 4)
    2. Processing in-memory: Aggregation for  2018-01-01 06:00:02 (event 5)
  2. Minute level aggregation
    1. <Aggregate name>_MINUTES table
      1. Aggregation for 2018-01-01 05:59:00 (Aggregation of event 0, 1 and 2)
    2. Processing in-memory: Aggregation for  2018-01-01 06:00:00 (That is the 0th minute of the 6th hour. Aggregation of event 3 and 4)
  3. Hour level aggregation
    1. <Aggregate name>_HOURS table: No aggregation data is written to this tables
    2. Processing in-memory: Aggregation for  2018-01-01 05:00:00 (That is the 5th hour. Aggregation of event 0, 1 and 2)


Single-node Deployment

In a single node deployment, if an @store configuration is not given, the older aggregations would be stored in in-memory tables (That is <Aggregate name>_MINUTES table, etc would be in-memory tables). In such a situation, if the server fails for some reason, all the aggregations done so far would be lost. 'Older aggregations' refers to aggregations which are not running in-memory (aggregations which have already been completed for a particular second, minute, etc.)

When some DB configuration has been provided with @store annotation, the older aggregations would be stored in the defined external database. Hence, if the node fails, once we restart the server, the in-memory running aggregations would be recreated from these stored data. In the given example, notice how the in-memory aggregations for the Minute level can be recreated with data iii and iv in the Seconds table. Likewise, Hour in-memory aggregations can be recreated with data in the Minute table. However, this recreation can only be done for all the durations except the most granular duration for which aggregation is done (For the given example, we cannot recreate the in-memory aggregations for the second level, since there's no table for a prior duration). 

Hence, for the given example, (when @store details are provided) if the server fails after all 5 events have arrived after we restart the server, the in-memory aggregations for minute and hour level would be recreated from table information. Effectively, only event 5 would be lost, since that was aggregating in-memory at the second level.

After such a recreation, if we resend events 1 to 5, the 1st 4 events would be ignored. Only event 5 would be processed at the second level. This filtering is done by checking whether the events arriving after recreating in-memory data, is newer than or equal to the expiry time of the latest event which is stored in the most granular version of tables. In the example, the most granular version of tables is <Aggregate name>_SECONDS table. The latest aggregation in the table is for 2018-01-01 06:00:01 whose expiry time is 2018-01-01 06:00:02. Hence only the 5th event gets processed. After encountering at least one such event that is newer than the expiry time of latest timestamp in the table, the decision of whether or not to process out of order events is based on the @IgnoreEventsOlderThanBuffer value.


HA Deployment

Lets now explore the behavior of Incremental Analytics in an HA setup. Please refer to Minimum High Availability (HA) Deployment documentation for more information regarding an HA setup and to see how we can configure one.

In a minimum HA setup, one WSO2 SP node would be active whereas the other would be passive. If the HA setup is configured properly, even if @store configuration is not provided none of the processed aggregations would be lost due to snapshotting the current state of the SP.

When a snapshot has already been created, the recreation from tables does not happen (since there's no necessity to do so). Hence, the events coming in after a server restart in an HA setup would be processed as in the case where no server failure occurs. That is, the details provided under the 'info' section of 'Single-node Deployment' is not applicable here. 



  • No labels