The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
||
Skip to end of metadata
Go to start of metadata

Incremental processing is a processing method which involves processing only a data partition newly added to a dataset when the existing data is already processed, instead of re-processing the complete dataset. This processing method improves efficiency by eliminating the system overhead of re-processing already processed data.

e.g., The data in an Analytics table needs to be summarized per day. The following table illustrates how the dataset is processed when the summarization script is run.


Without Incremental ProcessingWith Incremental Processing
1st RunThe complete dataset is processed to summarize the data.The complete dataset is processed to summarize the data.
2nd DayThe complete dataset is processed to summarize the data.Only the updates made to the dataset during the previous day are processed to summarize the data.

Once the summarization is complete, you need to commit the status to indicate that the data was successfully processed. This is done via the following command.
INCREMENTAL_TABLE_COMMIT orders; 

Publishing events

Incremental analytics uses the timestamps of the events sent when retrieving the data for processing.

When defining event streams for incremental analytics, you can add an extra attribute to the event payload named _timestamp of the LONG attribute type. This allows a specific timestamp to be specified for each event. For more information, see Understanding Event Streams and Event Tables - Adding an event stream.

If the _timestamp attribute is not specified in the event stream definition, the timestamp of each event is derived from the system date at the time the event was persisted in the database.

Syntax

create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");

In order to apply incremental processing to an Analytics table, the incrementalParams attribute should be added to the table definition as shown in the extract above. If this parameter is not added, the table is considered a typical analytics table, and the complete table is processed for each query.

The incrementalParams attribute should be added to the definition of the table from which the data is read.

Parameters

The following parameters are configured to support incremental processing as shown in the above sample syntax.

Unique ID
NameuniqueID
Required/OptionalRequired
DescriptionThis is the unique ID of the incremental analytics definition. This ID should be used in the incremental table commit command as shown in the sample syntax.
Time Period 
Name timePeriod
Required/Optional Required
Description

The duration of the time period that you are processing.This can be MINUTE, HOUR, DAY, MONTH or YEAR. DAS has the ability to process the timestamp of each event and identify the unit of time during which it was sent.

e.g., If you specify HOUR as the time period, an event sent at 10.20PM is identified as an event sent during the 22.00 - 23.00 hour.

Example

The following is a list of events, and it is required to calculate the number of orders placed during each day.

Customer IDPhone TypeOrder IDCost_timestamp
1Nexus 5x33slsa2s400
26th May 2016 12:00:01
12Galaxy S7kskds22160027th May 2016 02:00:02
43iPhone 6ssadl3122700
27th May 2016 15:32:04
2MoTo Xsdda221s35027th May 2016 16:22:10
32LG G5lka2s24dkQ550
27th May 2016 19:42:42

The summarization script is run on 27th May 2016, at 12.00 noon. The last processed order ID kskds221.

Then the summarization table is as shown below.

DayEvent Count
26th May 20161
27th May 20161


As a result, the summarized table for 27th May 2016 should have one event (because the other events that arrived on 27th May 2016 were received after 12.00 noon).

The next time the script is run, WSO2 DAS checks the timestamp of the event that was processed last. In this example, the last processed event is order ID kskds221 with timestamp 27th May 2016 02:00:02. Then DAS retrieves the data from the time period starting at 27th May 2016 00:00:00 to process all the data that has a timestamp greater than 27th May 2016 00:00:00.

This updates the value for the entry 27th May 2016 with the value 4 in the summarization table. The summarization table is not affected by any data received before 26th May 2016 because no data is retrieved from that time period. The resulting summarization table is shown below.

DayEvent Count
26th May 20161
27th May 20164

The following sample further demonstrates how the incremental processing is carried out.

 Click here to view the complete sample.

An event stream is defined with the following configuration. For more information about event streams, see Understanding Event Streams and Event Tables.

{
  "streamId": "APIStats:1.0.0",
  "name": "APIStats",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "count",
      "type": "INT"
    },
    {
      "name": "_timestamp",
      "type": "LONG"
    }
  ]
}

The Spark script written to process the data received by this event stream is as follows.

create temporary table APIStats using CarbonAnalytics options (tableName "APIStats", schema "name STRING, count INT, _timestamp LONG", incrementalParams "api_stats_1, MINUTE");

create temporary table APIStatsMinuteSummary using CarbonAnalytics options (tableName "APIStatsMinSummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT, hour INT, min INT", primaryKeys "name, min, hour, day, month, year", incrementalParams "api_stats_min_1, HOUR");

create temporary table APIStatsHourSummary using CarbonAnalytics options (tableName "APIStatsHourSummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT, hour INT", primaryKeys "name, hour, day, year", incrementalParams "api_stats_hour_1, DAY");

create temporary table APIStatsDaySummary using CarbonAnalytics options (tableName "APIStatsDaySummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT", primaryKeys "name, day, year");

insert into table APIStatsMinuteSummary select name, sum(count) as count, getMinuteStartingTime(getYear(first(_timestamp)), getMonth(first(_timestamp)), getDay(first(_timestamp)), getHour(first(_timestamp)), getMinute(first(_timestamp))) as _timestamp, getYear(first(_timestamp)) as year, getMonth(first(_timestamp)) as month, getDay(first(_timestamp)) as day, getHour(first(_timestamp)) as hour, getMinute(first(_timestamp)) as min from APIStats group by name, getYear(_timestamp), getMonth(_timestamp), getDay(_timestamp), getHour(_timestamp), getMinute(_timestamp);

INCREMENTAL_TABLE_COMMIT api_stats_1;

insert into table APIStatsHourSummary select name, sum(count) as count, getHourStartingTime(year, month, day, hour) as _timestamp, year, month, day, hour from APIStatsMinuteSummary group by name, year, month, day, hour;

INCREMENTAL_TABLE_COMMIT api_stats_min_1;

insert into table APIStatsDaySummary select name, sum(count) as count, getDateStartingTime(year, month, day) as _timestamp, year, month, day from APIStatsHourSummary group by name, year, month, day;

INCREMENTAL_TABLE_COMMIT api_stats_hour_1;

The incrementalParams "api_stats_1, MINUTE" parameter specifies incremental processing to be applied. When the script is run, the system identifies the last minute during which the summarization was done, and processes all events with a timestamp that is greater than the timestamp of the start of that minute.

To use the above sample, you need to add this UDF (download). The associated fully qualified class name entry for spark-udf-config.xml is org.wso2.das.sample.udf.DateTimeUDF.

For detailed instructions on how to add a UDF, see Creating Spark User Defined Functions.

  • No labels