Incremental aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.
This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and for further processing. Its schema is defined via the aggregation definition.
Configuring aggregation queries
This section explains how to calculate aggregates in an incremental manner for different sets of time periods, store the calculated values in a data store and then retrieve that information.
To demonstrate this, consider a scenario where a business that sells multiple brands stores its sales data in a physicaldatabase for the purpose of retrieving them later to perform sales analysis. Each sales transaction is received with the following details:
symbol: The symbol that represents the brand of the items sold.
price: the price at which each item was sold.
amount: The number of items sold.
The Sales Analyst needs to retrieve the total number of items sold of each brand per month, per week, per day etc., and then retrieve these totals for specific time durations to prepare sales analysis reports.
To address the above use case, Siddhi queries need to be designed in two steps as follows:
Step 1: Calculate and persist time-based aggregate values
Before you begin:
Before creating queries to calculate and persist aggregate values, the following prerequisites need to be completed:
- When you are defining a physical store to store aggregate values, you need to first download, install and set up the required database type. For more information about setting up the database, see Defining Tables for Physical Stores.
If the aggregation query included in the Siddhi application is only for read-only purposes, disable data purging.
To write the required Siddhi queries to calculate and persist aggregate values required for the scenario outlined above, follow the steps below:
To capture the input events based on which the aggregations are calculated, define an input stream as follows. The stream definition includes attributes named symbol, price and amount to capture the details described above.
The stream definition includes attributes named
amountto capture the details described above. In addition, it has an attribute named
timestampto capture the time at which the sales transaction occurs. The aggregations are executed based on this time.
This attribute's value could either be a long value (reflecting the Unix timestamp in milliseconds), or a string value adhering to one of the following formats.
<yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>: This format can be used if the timezone needs to be specified explicitly. Here the ISO 8601 UTC offset must be provided for <Z> . e.g., +05:30 reflects the India Time Zone. If time is not in GMT, this value must be provided.)
<yyyy>-<MM>-<dd> <HH>:<mm>:<ss>: This format can be used if the timezone is in GMT.
To persist the aggregates that are calculated via your Siddhi application, include a store definition as follows, if not the data is stored in-memory and lost when siddhi app is stopped.
Define an aggregation as follows. You can name it
When you save aggregate values in a store, the system uses the aggregation name you define here as part of the database table name. Table name is
<Aggregation_Name>_<Granularity>. Some database types have length limitations for table names (e.g., for Oracle, it is 30 characters). Therefore, you need to check whether the database type you have used for the store has such table name length limitation, and make sure that the aggregation name does not exceed that maximum length.
- To calculate aggregations, include a query as follows:
To get input
TradeStreamstream that you previously defined, add a
fromclause as follows.
To select attributes to be included in the output event, add a
selectclause as follows.
Based on this, each output event has the following attributes.
The symbol representing the product sold is included in each output event.
The average price at which the brand is sold. This is derived by applying the
avg()function to the
priceattribute with each input event.
The total number of items sold for each product. This is derived by applying the
sum()function to the quantity attribute with each input event.
The average price and the total number of items are the aggregates calculated in this scenario.
To group the output by the symbol, add a
group byclause as follows.
The group by clause is optional in incremental aggregations. If it is not provided, all the events are aggregated together for each duration.
The timestamp included in each input event allows you to calculate aggregates for the range of time granularities seconds-years. Therefore, to calculate aggregates for each time granularity within this range, add the
aggregate byclause to this aggregate query as follows.
This results in the average price and the total number of items sold being calculated per second, minute, hour, day, month and year. These time-based calculations are saved in the MySQL store you defined.
If you exclude one or more time granularities from this range, you cannot later retrieve aggregates for them. e.g., You cannot aggregate by timestamp every
sec ... dayin the aggregate definition, and then attempt to retrieve per
months. This is because monthly values are not calculated and persisted.
You can also provide comma-separated values such as
every day, month,...
The complete Siddhi application with all the definitions and queries added looks as follows.
The following is a list of optional annotations supported for incremental aggregation.
@store(type=<store type>, ...)
The aggregated results are stored in the database defined via this annotation in tables corresponding to each aggregate duration. For more information on how to define a store, see Defining Tables for Physical Stores.
When these database entries are created, the primary key is defined internally. Therefore, you must not include an
@PrimaryKeyannotation within this
@storeannotation in the aggregation definition.
If a store is not defined via this annotation, the aggregated results are stored in in-memory tables by default. Adding a specific store definition is useful in a production environment. This is because the aggregations stored in in-memory tables can be lost if a system failure occurs.
The names of the tables are created in the
<AGGREGATION_NAME>_<DURATION>format. In this scenario, the following tables are created:
The primary keys of these tables are determined as follows:
AGG_EVENT_TIMESTAMPare internally calculated values that reflect the time bucket of an aggregation for a particular duration. e.g. for the
AGG_TIMESTAMP = 1515110400000reflects that it is the aggregation for the 5th of January 2018 (
1515110400000is the Unix time for
2018-01-05 00:00:00). All aggregations are based on GMT timezone.
The other values stored in the table would be aggregations and other function calculations done in the aggregate definition. If any such function calculation is not an aggregation, the output value corresponds with the function calculation for the latest event of that time bucket. e.g., If a multiplication is carried out, the multiplication value corresponds with the latest event's multiplication as per the duration.
Certain aggregations are internally stored as a collection of other aggregations. e.g., the average aggregation is internally calculated as a function of sum and count. Hence, the table only reflects a sum and a count. The actual average is returned when the user retrieves the aggregate values as described in Step 2: Retrieve calculated and persisted aggregate values.
The other values stored in the database table are aggregations and other function calculations carried out via the aggregation definition.
In the scenario described in this section, a timestamp is assigned to each input event via the
group bykey. Therefore, the
TestDBstore defined via the
@storeannotation uses the
symbolattributes as primary keys. Each record . in this store must have a unique combination of values for these two attributes.
This specifies whether automatic data purging is enabled or not. If automatic data purging is enabled, you need to specify the time interval at which the data purging should be carried out. In addition, you need to specify the time period for which the data should be retained based on the granularity by including the
@retentionPeriodannotation (described below). If this annotation is not included in an incremental aggregation query, the data purging is carried out every 15 minutes based on the default retention periods mentioned in the description of the
If you want to disable automatic data purging, you can use this annotation as follows:
You should disable data purging if the aggregation query is included in the Siddhi application for read-only purposes.
This specifies the time period for which data needs to be retained before it is purged, based on the granularity. The retention period defined for each granularity should be greater than, or equal to the minimum retention period as given below:
second: 120 seconds
minute: 120 minutes
hour: 25 hours
day: 32 days
month: 13 months
If the retention period is not specified, the default retention period for each granularity is applied as given below:
second: 120 seconds
minute: 24 hours
hour: 30 days
day: 1 year
Step 2: Retrieve calculated and persisted aggregate values
This step involves retrieving the aggregate values that you calculated and persisted in Step 1. To do this, let's add the Siddhi definitions and queries required for retrieval to the
TradeApp Siddhi application that you have already started creating for this scenario.
Retrieval logic for the same aggregation can be defined in different Siddhi app. However, only one aggregation should carry out the processing (i.e. the aggregation input stream should only feed events to one aggregation definition).
For this scenario, let's assume that the Sales Analyst needs to retrieve the sales totals for the time duration between 15th February 2014 and 16th March 2014.
To retrieve aggregations, you need to make retrieval requests. To capture these requests as events, let's define a stream as follows.
To process the events captured via the
TradeSummaryRetrievalStreamstream you defined, add a new query as follows.
This query matches events in the
TradeSummaryRetrievalStreamstream and the
TradeAggregationaggregation that was defined in step 1 based on the value for the
symbolattribute, and performs a join for each matching pair. Based on that join, it produces an output event(s) with the
avgPriceattributes for the day granularity within the time range
2014-03-16 00:00:00. The time zone is represented by
Note the following about the query given above:
oncondition is optional for retrieval.
- You can provide the
withinduration in two ways as follows:
within <start_time>, <end_time>: The
LONGvalues. If it is a string value, the format needs to be
<yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>(
<Z>represents the timezone. It can be omitted if the time is in GMT). You can provide long values if you need to specify the times as Unix timestamps (in milliseconds). In the above query, you are specifying the time duration via this method in the
within <within_duration>: This method allows you to enter the time duration only as a STRING value. The format can be one of the following:
<yyyy>-**-** **:**:** <Z>
<yyyy>-<MM>-** **:**:** <Z>
<yyyy>-<MM>-<dd> **:**:** <Z>
<yyyy>-<MM>-<dd> <HH>:**:** <Z>
<yyyy>-<MM>-<dd> <HH>:<mm>:** <Z>
<yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>
2018-01-** **:**:**, it means within the first month of 2018. This is equal to the
2018-01-01 00:00:00", "2018-02-01 00:00:00clause provided as per the previous method.
You do not need to specify the timezone via
<Z>if the time is in GMT.
perclause specifies the time granularity for which the aggregations need to be retrieved. The value for this clause can be
years. The time granularity for which you want to retrieve values must have been included in the time range you specified when calculating and persisting aggregates. For more information, see Calculating and persisting time-based aggregate values - Step 4, substep d.
- The output contains the
symbolfor all the days falling within the given time range.
The following are other ways in which you can construct the query to retrieve aggregate values:
- Instead of providing the within and per values as constants in the query itself, you can retrieve them via attributes in the input stream definition as shown below.
Define the input stream as follows.
The above schema allows you to enter the start time and the end time of the time duration for which you want to retrieve aggregates in the aggregate retrieval request by including values for the
endTimeattributes. The time granularity for which the aggregations need to be retrieved can be specified as the value for the
Then add the query as follows.
perclauses refer to the attributes in the input stream that specify the time duration and the per duration.
If you want the retrieved aggregates to be sorted in the ascending order based on time, include the
selectclause as shown below. Once it is included in the
selectclause, you can add the
order by AGG_TIMESTAMPclause.
Incremental aggregation in single-node deployments and HA deployments
In order to understand how incremental aggregation is carried out in different deployment, consider the following example that explains how the aggregation is executed internally.
Assume that six events arrive in the following sequence, with the given timestamps.
- event 0 → 2018-01-01 05:59:58
- event 1 → 2018-01-01 05:59:58
- event 2 → 2018-01-01 05:59:59
- event 3 → 2018-01-01 06:00:00
- event 4 → 2018-01-01 06:00:01
- event 5 → 2018-01-01 06:00:02
In this scenario, the aggregation is done for second, minute and hour durations. Therefore, based on the above timestamps, the second, minute, and hour during which each event occured is as follows.
As mentioned before, when storing, time based aggregatesa table is created for each time granularity in the
<AGGREGATE_NAME>_<TIME_GRANUARITY> format. Assuming that the name of the aggregation in
TradeAggregation, three tables are created as follows.
The incremental analysis related execution that is carried out by the system with the arrival of each event is described in the table below.
|0||The system initially processes event 0 at the |
|1||This event also occurs during the 58th second (same as event 0). Therefore, the system aggregates events 0 and 1 together.|
Event 2 arrives during the 59th second. The 58th second has elapsed at the time of this event arrival. Therefore, the system does the following:
Event 3 arrives during the second 0 of 06.00 minute. With this arrival, the aggregations for the 59th second and 59th minute expire. Therefore, the system does the following:
At the time event 4 arrives during the 1st second of the 06.00 minute, second 0 of the same minute has elapsed. Therefore, the system does the following:
Event 5 arrives during the 2nd second. At this time, the 1st second has elapsed. Therefore, the system does the following:
The following is the aggregation status after all six events have arrrived.
|In-memory Table||Available Aggregation Records||Processing In-Memory|
Aggregation for 2018-01-01 06:00:00 (i.e., minute 0 of the 6th hour. Here, event 3 and 4 are aggregated together.)
|None||Aggregation for 2018-01-01 05:00:00 (i.e., the 5th hour. Here, events 0,1, and 2 are aggregated together.)|
Now let's consider how the scenario described above works in a single node deployment and an HA deployment.
- Single node deployment
@storeconfigurtation is not provided, the system stores older aggregations (i.e., aggregations that are already completed for a particular second, minute etc., and therefore not running in-memory) in in-memory tables. In such a situation, a server failure results in all these aggregations done up to the time of the failure being lost.
If a database configuration is provided via the
@storeannotation, the older aggregations are stored in the external database. Therefore, if the node fails, the system recreates the in-memory running aggregations from this stored data once you restart the server. In the given scenario, in-memory aggregations for the
MINUTEexecution level can be recreated with data in the
TradeAggregation_SECONDStable (i.e., points 3 and 4 in the above aggregation status summary table). Similarly, in-memory aggregations for the
HOURexecution level can be recreated from the
However, the recreation described above cannot be done for the most granular duration for which aggregation done. e.g., In the given scenario, the system cannot recreate the in-memory aggregations for the
SECONDexecution level because there is no database table for a prior duration.
Assume that in the given scenario, a server failure occurs after all five events have arrived. Once you restart the server, only event 5 is lost because that was the only aggregation being executed for the
SECONDexecution level in the in-memory tables at that time.
To understand how WSO2 SP runs as a minimum HA cluster, see Minimum High Availability (HA) Deployment.
In a minimum HA setup, one runs as an active node and the other is passive. No events are lost even if the
@storeconfiguration is not provided due to snapshoting the current state of SP. When a snapshot iof the SP state is created, the system does not recreate aggregations from tables because it is not required. The newly active node (which was previously passive) continues to process the new events that arrive after the failure of the other node as in a situation where no server failura has occured.
The following is a summary of the retrievability of aggregates based on how they are stored and how WSO2 SP is deployed.
Scaling through distributed aggregations
Distributed aggregations partially process aggregations in different nodes. This allows you to assign one node to process only a part of an aggregation (regional scaling, etc.). In order to do this all the aggregations must have a physical database and must be linked to the same database.
@PartitionById annotation must be added before the aggregation definition as shown below.
You can enable all Siddhi applications to partition aggregations in this manner by adding the following system parameters in the
<SP_HOME>/conf/<PROFILE/deployment.yaml file under
A unique ID must be provided for each node via a system parameter named
shardId. This parameter is required when partitioning aggregations is enabled for a single Siddhi application as well as when it is enabled for all Siddhi applications.
To maintain data consistency, do not change the shard IDs after the first configuration.
When you enable the aggregation partitioning feature, a new column ID named
SHARD_ID is introduced to the aggregation tables. Therefore, you need to do one of the following options after enabling this feature to avoid errors occuring due to the . differences in the table schema.
- Delete all the aggregation tables for
- Edit the aggregation tables by adding a new column named
SHARD_ID, and specify it as a primary key.
The following query can be included in two Siddhi applications in two different nodes that are connected to the same database. Separate input events are generated for both nodes. Each node performs the aggregations and stores the results in the database. When the aggregations are retrieved, the collective result of both nodes are considered.
Let's assume that the following input events were generated for the two nodes during a specific hour.
Here, node 1 calculates an hourly total of 30, and node 2 calculates an hourly total of 40. When you retrieve the total for this hour via a retrieval query, the result is 70.
For more information about incremental aggregation, see Siddhi Query Guide - Incremental Aggregation .