Incremental processing is a processing method which involves processing only a data partition newly added to a dataset when the existing data is already processed, instead of re-processing the complete dataset. This processing method improves efficiency by eliminating the system overhead of re-processing already processed data.
e.g., The data in an Analytics table needs to be summarized per day. The following table illustrates how the dataset is processed when the summarization script is run.
|Without Incremental Processing||With Incremental Processing|
|1st Run||The complete dataset is processed to summarize the data.||The complete dataset is processed to summarize the data.|
|2nd Day||The complete dataset is processed to summarize the data.||Only the updates made to the dataset during the previous day are processed to summarize the data.|
Once the summarization is complete, you need to commit the status to indicate that the data was successfully processed. This is done via the following command.
Incremental analytics uses the timestamps of the events sent when retrieving the data for processing.
When defining event streams for incremental analytics, you can add an extra attribute to the event payload named _
timestamp of the
LONG attribute type. This allows a specific timestamp to be specified for each event. For more information, see Understanding Event Streams and Event Tables - Adding an event stream.
If the _
timestamp attribute is not specified in the event stream definition, the timestamp of each event is derived from the system date at the time the event was persisted in the database.
In order to apply incremental processing to an Analytics table, the
incrementalParams attribute should be added to the table definition as shown in the extract above. If this parameter is not added, the table is considered a typical analytics table, and the complete table is processed for each query.
incrementalParams attribute should be added to the definition of the table from which the data is read.
The following parameters are configured to support incremental processing as shown in the above sample syntax.
|Description||This is the unique ID of the incremental analytics definition. This ID should be used in the incremental table commit command as shown in the sample syntax.|
The duration of the time period that you are processing.This can be
e.g., If you specify
The following is a list of events, and it is required to calculate the number of orders placed during each day.
The summarization script is run on 27th May 2016, at 12.00 noon. The last processed order ID
Then the summarization table is as shown below.
As a result, the summarized table for 27th May 2016 should have one event (because the other events that arrived on 27th May 2016 were received after 12.00 noon).
The next time the script is run, WSO2 DAS checks the timestamp of the event that was processed last. In this example, the last processed event is order ID
This updates the value for the entry 27th May 2016 with the value
The following sample further demonstrates how the incremental processing is carried out.
An event stream is defined with the following configuration. For more information about event streams, see Understanding Event Streams and Event Tables.
The Spark script written to process the data received by this event stream is as follows.
incrementalParams "api_stats_1, MINUTE" parameter specifies incremental processing to be applied. When the script is run, the system identifies the last minute during which the summarization was done, and processes all events with a timestamp that is greater than the timestamp of the start of that minute.