||
Skip to end of metadata
Go to start of metadata

This guide explains how to configure Cassandra data purging in WSO2 BAM using an example use case.

Example use case

In this use case, WSO2 API Manager will use WSO2 BAM to collect and summarize its API invocation information. For information on examples of viewing API usage and performance statistics through WSO2 API Manager dashboard, go to Viewing API StatisticsThis guide uses an example of viewing overall API usage statistics. The statistical view of the example API usage, in the WSO2 API Manager dashboard will be as follows.

The above dashboard provides information on the number of API calls invoked per API, in between the time period from 2014-09-04 to 2014-11-11. Multiple scenarios on different ways of writing Hive analytic queries to get the above summarized result, and possibility of applying data deletion in them are as follows:

 Calculating minute wise stats from analytic queries

Calculating minute wise stats from analytic queries

This technique is used in WSO2 API Manager toolbox. Minute-wise stats are calculated using raw data and inserted to RDBMS using Hive analytic queries. When the start and end dates are provided from the dashboard, aggregated sum is calculated using the records in RDBMS table for the given period.

The Hive analytic queries, which are used to do the above summarization are as follows. For information on WSO2 API Manager toolbox related artifacts, go to API Manager Analytics.

CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestData 
    (key STRING, api STRING, api_version STRING, consumerKey STRING, userId STRING, context STRING, version STRING, request INT, requestTime BIGINT, resourcePath STRING, method STRING, hostName STRING, apiPublisher STRING) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' 
WITH SERDEPROPERTIES 
    (wso2.carbon.datasource.name'='WSO2BAM_CASSANDRA_DATASOURCE',
     "cassandra.cf.name" = "org_wso2_apimgt_statistics_request",
     "cassandra.columns.mapping" = ":key, payload_api, payload_api_version, payload_consumerKey, payload_userId, payload_context, payload_version, payload_request, payload_requestTime, payload_resourcePath, payload_method, payload_hostName, payload_apiPublisher");   
 
CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestSummaryData 
    (api STRING, api_version STRING, version STRING, apiPublisher STRING, consumerKey STRING, userId STRING, context STRING, max_request_time BIGINT, total_request_count INT, hostName STRING, year SMALLINT, month SMALLINT, day SMALLINT, time STRING) 
STORED BY 
    'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES ('wso2.carbon.datasource.name'='WSO2AM_STATS_DB',
    'hive.jdbc.update.on.duplicate' = 'true',
    'hive.jdbc.primary.key.fields'='api,api_version,version,apiPublisher,consumerKey,userId,context,hostName,time',
    'hive.jdbc.table.create.query' = 'CREATE TABLE API_REQUEST_SUMMARY 
          (api VARCHAR(100), api_version VARCHAR(100), version VARCHAR(100), apiPublisher VARCHAR(100), consumerKey VARCHAR(100), userId VARCHAR(100), context VARCHAR(100), max_request_time BIGINT, total_request_count INT, hostName VARCHAR(100), year SMALLINT, month SMALLINT, day SMALLINT, time VARCHAR(30),
PRIMARY KEY(api,api_version,apiPublisher,consumerKey,userId,context,hostName,time))');
 
INSERT OVERWRITE TABLE APIRequestSummaryData 
SELECT api, api_version,version, apiPublisher, COALESCE(consumerKey,''), userId, context, max(requestTime) as max_request_time, sum(request) as total_request_count,hostName, 
                 year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year,     
                 month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,
                 day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as day,
CONCAT(substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16),':00') as time FROM APIRequestData 
WHERE context IS NOT NULL GROUP BY api,api_version,version,apiPublisher,consumerKey,userId,context,hostName,
                 year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), 
                 month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
                 day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
                 hour(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
                 minute(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
                 substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16);            

Descriptions on the above queries are as follows:

QueryPurpose 
CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestDatacreates a virtual Hive table by the name APIRequestData corresponding to a physical org_wso2_apimgt_statistics_request Cassandra column family, which contains raw data. 
CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestSummaryData

creates a virtual Hive table by the name APIRequestSummaryData corresponding to a physical RDBMS table named API_REQUEST_SUMMARY.

You do not have to match the column names in the Hive table to those in RDBMS. The query maps each column in Hive with a column in RDBMS table based on the order it is defined. The primary key fields are api, api_version, version, apiPublisher, consumerKey, userId, context, hostName and time.

 
INSERT OVERWRITE TABLE APIRequestSummaryDataprocesses the data in the virtual APIRequestData Hive table (do summation of API request calls), and then transfers summarized information to API_REQUEST_SUMMARY RDBMS table. 

In the group by statement of the above insert overwrite query, raw records are grouped by API-specific information such as API version, API name etc. and most importantly by the invoked year, month, day, hour and minute. This means that minute wise stats of API invocations are calculated and stored in RDBMS.

A snapshot of API_REQUEST_SUMMARY table will be as folows. It has the aggregated results per each an every minute and TIME filed is coarse grained up to its minute value.

Applying data purging 

Minute wise stats of API invocations are calculated by the Hive queries and stored in above RDBMS table. If you run the same Hive query repeatedly, it will process the same data set again and again, and overwrites data in RDBMS table. Since we calculate minute wise stats, INSERT OVERWRITE command (similar to SQL UPDATE command) used in the above query updates the records by considering primary key field values. But there is no need of processing the entire data set repeatedly. As we have summarized results in our RDBMS table, deleting old raw data in our Cassandra column families will not have any impact on summarized results. However, you cant delete the raw records that is been inserted in the current minute. That is because, if more records are inserted in the same minute after the deletion, Hive queries will update RDBMS table with an incorrect aggregated result.

Configuring data purging

You can use both data archiving and deleting features for data purging. But, the most suitable method for this use case is, configuring deletion through the toolbox. For example, you can configure deletion for the API request stream definition (corresponding stream definition of this use case) as follows. This configuration is to delete data that is older than 90 days, and the deletion job to run every midnight.

streams.definitions.defn2.filename=requestStreamDefn
streams.definitions.defn2.username=admin
streams.definitions.defn2.password=admin
streams.definitions.defn2.description=This is the datastream published from APIManager Stats Client
streams.definitions.defn1.enablePurge=true
streams.definitions.defn1.purgeAfterDays=90
streams.definitions.defn1.purgeCron=0 0 0 * * ?

Cassandra deletes data in a different way from a traditional, relational database. For more information on deleting Cassandra data, go to Cassandra Wiki. When you delete records from Cassandra, it does not immediately remove those data from column families, and instead converts them to tombstone values. These values will be removed only after their gc grace values pass. Thereby, you have to configure Hive analytic queries in a way, that they will not not pick those tombstone records. That can be done by updating a Hive script to not to pick any null value columns. In this example, we have added the script below, close to insert overwrite query. Generally context cannot be null, and if it is null, then it is a tombstone record , which can be filtered as follows:

where context is not NULL

 Calculating hourly stats from analytic queries

Calculating hourly stats from analytic queries

In this scenario hourly stats are calculated from raw data instead of minute wise stats calculated in calculating minute wise stats. Below is the corresponding insert overwrite query for the above use case.

INSERT OVERWRITE TABLE APIRequestSummaryData 
SELECT api, api_version, version, apiPublisher, COALESCE(consumerKey,''), userId, context, max(requestTime) as max_request_time, sum(request) as total_request_count, hostName,  
       year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year, 
       month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,
       day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as day,
       concat(substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,13),':00:00') as time 
FROM APIRequestData WHERE context is not NULL GROUP BY api, api_version, version, apiPublisher, consumerKey, userId, context, hostName,
       year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), 
       month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       hour(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,13);

In the above query, grouping by minute value is removed from the group by statement, and also time stamp field is coarse grained up to its hour value.Tthe snapshot of a new API_REQUEST_SUMMARY table will be as follows.

You can configure the data purging as explained in calculating minute wise stats. But, you can't purge data of the current hour.

 Calculating daily stats from analytic queries

Calculating daily stats from analytic queries

In this scenario, daily stats are calculated from raw data. The corresponding insert overwrite query will be as follows:

INSERT OVERWRITE TABLE APIRequestSummaryData 
SELECT api, api_version, version, apiPublisher, COALESCE(consumerKey,''), userId, context, max(requestTime) as max_request_time, sum(request) as total_request_count, hostName,  
       year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year, 
       month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,
       day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as day,
       substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,10) as time 
FROM APIRequestData WHERE context is not NULL GROUP BY 
       api, api_version, version, apiPublisher, consumerKey, userId, context, hostName,
       year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), 
       month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       day(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       substring(from_unixtime(cast(requestTime/1000 as BIGINT),yyyy-MM-dd HH:mm:ss'),0,10);

In the above query, grouping by both minute and hour values are removed from the group by statement, and time stamp field is coarse grained up to its day value. The snapshot of a new API_REQUEST_SUMMARY table will be as follows.

You can configure the data purging as explained in calculating minute wise stats. But, you can't purge today's data.

 Calculating monthly stats from analytic queries

Calculating monthly stats from analytic queries

In this scenario, monthly stats are calculated from raw data. Corresponding modified queries from calculating minute wise stats are as follows.

You can't use this scenario in the API Manager use case because, API Manager statistics dashboard requires stats between a time period of days.

CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestSummaryData 
    (api STRING, api_version STRING, version STRING, apiPublisher STRING, consumerKey STRING, userId STRING, context STRING, max_request_time BIGINT, total_request_count INT, hostName STRING, year SMALLINT, month SMALLINT, time STRING) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' 
TBLPROPERTIES 
    ('wso2.carbon.datasource.name'='WSO2AM_STATS_DB',
     'hive.jdbc.update.on.duplicate' = 'true',
     'hive.jdbc.primary.key.fields'='api, api_version, version, apiPublisher, consumerKey, userId, context, hostName, time',
     'hive.jdbc.table.create.query' = 'CREATE TABLE API_REQUEST_SUMMARY 
             (api VARCHAR(100), api_version VARCHAR(100), version VARCHAR(100), apiPublisher VARCHAR(100), consumerKey VARCHAR(100), userId VARCHAR(100), context VARCHAR(100), max_request_time BIGINT, total_request_count INT, hostName VARCHAR(100), year SMALLINT, month SMALLINT, time VARCHAR(30),
PRIMARY KEY(api, api_version, apiPublisher, consumerKey, userId, context, hostName, time))' );
INSERT OVERWRITE TABLE APIRequestSummaryData 
SELECT api, api_version, version, apiPublisher, COALESCE(consumerKey,''), userId, context, max(requestTime) as max_request_time, sum(request) as total_request_count, hostName,  
    year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year, 
    month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,
    substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,7) as time 
    from APIRequestData where context is not NULL 
    group by api, api_version, version,apiPublisher, consumerKey, userId, context, hostName,
       year(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), 
       month(from_unixtime(cast(requestTime/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),
       substring(from_unixtime(cast(requestTime/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,7);

In the above query, grouping by minute, hour, day values are removed from the group by statement, and time stamp field is coarse grained up to its month value. The snapshot of a new API_REQUEST_SUMMARY table will be as follows.

For this scenario, you can't  schedule the data deletion through a toolbox or the Management Console, unless incremental analysis is configured. The reason is, by using that method you can only configure to keep data for last n number of days (for example: last 90 days), so there are possibilities of giving wrong aggregated results for monthly stats, because some of the data of the month being considered may have been deleted. But still you can use BAM Management Console to purge data manually by specifying proper data ranges. Hereby, start date has to be a first day of a month, and the end date has to be a last day of a month (cannot be the current month). For example: 01/02/2014 to 30/04/2014.

 Calculating stats without grouping by time

Calculating stats without grouping by time

In all above scenarios, aggregate results have been calculated by grouping by time. In this scenario aggregated function is applied to the whole dataset using an example Hive query as follows.

INSERT OVERWRITE TABLE APIRequestSummaryData 
SELECT api, api_version, version, apiPublisher, COALESCE(consumerKey,''), userId, context, max(requestTime) as max_request_time,sum(request) as total_request_count, hostName 
from APIRequestData where context is not NULL group by api, api_version, version, apiPublisher, consumerKey, userId, context, hostName;

For this scenario, you can't use any data purging method, unless incremental analysis is configured. If the incremental data analysis is configured, data will be processed incrementally, so that data purging will not have any affect on the aggregated results.

When configuring data purging, you should mainly consider whether it is affecting the end results. For example, in this example use case, since you have correct summarized results in your RDBMS tables, data purging should not have any impact on the displayed end results. However, you must keep backups of your summarized RDBMS tables because, if you loose the summarized data, you cannot calculate the same results from analytics jobs. 

  • No labels