The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
||
Skip to end of metadata
Go to start of metadata

WSO2 DAS supports several RDBMS types for its underlying Data Access Layer (DAL). In order to use the RDBMS DAL component, a pre-configured installation of a RDBMS is required. The RDBMS implementation for the DAS DAL contains the implementations of the Analytics Record Store. The following sections describe both implementations of the Cassandra DAL component

The RDBMS datasource supports both pagination and record count. Pagination support and record count support are disabled by default. You can enable them by setting the value of paginationSupported and recordCountSupported properties in the <DAS_HOME>/repositor/conf/analytics/rdbms-config.xml file to true.

Enabling the RDBMS datasource provider

In order to use RDBMS as the record store, ensure that the RDBMS datasource provider is enabled by following the steps below.

RDBMS is the default database type of the record store. Therefore, the RDBMS datasource provider is uncommented by default.

  1. Open the <DAS_HOME>/repository/conf/datasources/analytics-datasources.xml file.
  2. If the datasource provider is commented out, uncomment it as shown below.

    <provider>org.wso2.carbon.ndatasource.rdbms.RDBMSDataSourceReader</provider>

Configurations for the Analytics Record Store

The Analytics Record Store consists of two datasource configurations as follows.

Configuring the Event Store

For configuring RDBMS as the underlying datasource implementation for the Event Store of the Analytics Record Store, specify the RDBMS configurations in the <DAS_HOME>repository/conf/analytics/analytics-config.xml file as shown in the example below.

<analytics-record-store name="EVENT_STORE">
    <implementation>org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore</implementation>
    <properties>
        <!-- the data source name mentioned in data sources configuration -->
        <property name="datasource">WSO2_ANALYTICS_EVENT_STORE_DB</property>
    </properties>
</analytics-record-store>

The properties of the above configuration are described below.

Property
Description
<implementation>
The implementation class of the Analytics Record Store relevant for RDBMS, which is org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore
<property name="datasource">
The Carbon datasource name of the RDBMS type, that is used to find the associated RDBMS data source.

Configuring the Processed Data Store

To configure RDBMS as the underlying datasource implementation for the Processed Data Store of the Analytics Record Store, specify the RDBMS configurations in the <DAS_HOME>repository/conf/analytics/analytics-config.xml file as shown in the sample below.

 <analytics-record-store name = "PROCESSED_DATA_STORE">
      <implementation>org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore</implementation>
      <properties>
            <property name="datasource">WSO2_ANALYTICS_PROCESSED_DATA_STORE_DB</property>
      </properties>
   </analytics-record-store>

The properties of the above configuration are described below.

Property
Description
<implementation>
The implementation class of the Analytics Record Store relevant for RDBMS, which is org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore
<property name="datasource">
The Carbon datasource name of the RDBMS type, that is used to find the associated RDBMS data source.

RDBMS query configuration

The above Analytics Record Store depends on a query configuration to execute its implementation. The query configuration contains SQL query templates for each of the RDBMS servers that it interfaces with. Using this configuration, you can modify the existing queries for fine tuning, or create new query configurations for a RDBMS that is not configured out of the box. You can find these configurations in the <DAS_HOME>/repository/conf/analytics/rdbms-config.xml file as shown in the example below. Click on the relevant tab to view the query templates for each database category.

    <database name = "h2.*" minVersion = "1.0" maxVersion = "10.0">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} LIMIT 1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordMergeQuery>MERGE INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) KEY (record_id) VALUES (?, ?, ?, ?)</recordMergeQuery>
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? LIMIT ?,?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX IF EXISTS {{TABLE_NAME}}_TIMESTAMP</query>
            <query>DROP INDEX IF EXISTS {{TABLE_NAME}}_PARTITION_KEY</query>
            <query>DROP TABLE IF EXISTS {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(50), timestamp BIGINT, data BINARY, partition_key INT, PRIMARY KEY(record_id))</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
<database name = "mysql" category = "large_dataset_optimized">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} LIMIT 1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordMergeQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE partition_key=VALUES(partition_key), timestamp=VALUES(timestamp), data=VALUES(data)</recordMergeQuery>
        <forwardOnlyReadEnabled>true</forwardOnlyReadEnabled>
        <fetchSize>-2147483648</fetchSize>        
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? LIMIT ?,?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}}</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}}</query>
            <query>DROP TABLE IF EXISTS {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(50), timestamp BIGINT, data LONGBLOB, partition_key INT, PRIMARY KEY(record_id)) ENGINE='MyISAM'</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
    <database name="mysql" category="write_optimized">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} LIMIT 1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordMergeQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE partition_key=VALUES(partition_key), timestamp=VALUES(timestamp), data=VALUES(data)</recordMergeQuery>
        <forwardOnlyReadEnabled>true</forwardOnlyReadEnabled>
        <fetchSize>-2147483648</fetchSize>        
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? LIMIT ?,?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}}</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}}</query>
            <query>DROP TABLE IF EXISTS {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(128), timestamp BIGINT, data LONGBLOB, partition_key
                INT, PRIMARY KEY(record_id)) ENGINE='MyISAM'</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>     
    </database>
    <database name="mysql" category="read_write_optimized">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} LIMIT 1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?
        </recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?
        </recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})
        </recordDeletionWithIdsQuery>
        <recordMergeQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?) ON
            DUPLICATE KEY UPDATE partition_key=VALUES(partition_key), timestamp=VALUES(timestamp), data=VALUES(data)
        </recordMergeQuery>
        <forwardOnlyReadEnabled>true</forwardOnlyReadEnabled>
        <fetchSize>-2147483648</fetchSize>
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and
            partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? LIMIT ?,?
        </recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN
            ({{RECORD_IDS}})
        </recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}}</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}}</query>
            <query>DROP TABLE IF EXISTS {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(128), timestamp BIGINT, data LONGBLOB, partition_key
                INT, PRIMARY KEY(record_id)) ENGINE='InnoDB'
            </query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
    <database name = "oracle">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE2</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} WHERE rownum=1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordInsertQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?)</recordInsertQuery>
        <recordUpdateQuery>UPDATE {{TABLE_NAME}} SET partition_key = ?, timestamp = ?, data = ? WHERE record_id = ?</recordUpdateQuery>
        <recordMergeQuery>MERGE INTO {{TABLE_NAME}} dest USING( SELECT  ? partition_key, ? timestamp, ? data, ? record_id FROM dual) src ON(dest.record_id = src.record_id) WHEN NOT MATCHED THEN INSERT(partition_key, timestamp, data, record_id) VALUES(src.partition_key, src.timestamp, src.data, src.record_id) WHEN MATCHED THEN UPDATE SET dest.partition_key = src.partition_key, dest.timestamp = src.timestamp, dest.data = src.data</recordMergeQuery>
        <recordRetrievalQuery>SELECT record_id, timestamp, data from (SELECT rownum RNUM, record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? and rownum &lt;= ?) where RNUM &gt; ?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP</query>
            <query>DROP TABLE {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR2(50), timestamp NUMBER(19), data BLOB, partition_key NUMBER(10), PRIMARY KEY(record_id))</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
    <database name = "Microsoft SQL Server">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE2</paginationMode>
        <blobLengthRequired>true</blobLengthRequired>
        <recordTableCheckQuery>SELECT TOP 1 1 from {{TABLE_NAME}}</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <!--recordMergeQuery>MERGE {{TABLE_NAME}} AS dest USING (SELECT ?, ?, ?, ?) AS src (partition_key, timestamp, data, record_id) ON (dest.record_id = src.record_id) WHEN MATCHED THEN UPDATE SET partition_key = src.partition_key, timestamp = src.timestamp, data = src.data WHEN NOT MATCHED THEN INSERT(partition_key, timestamp, data, record_id) VALUES (src.partition_key, src.timestamp, src.data, src.record_id);</recordMergeQuery-->
        <recordInsertQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?)</recordInsertQuery>
        <recordUpdateQuery>UPDATE {{TABLE_NAME}} SET partition_key = ?, timestamp = ?, data = ? WHERE record_id = ?</recordUpdateQuery>
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM (SELECT ROW_NUMBER() OVER(ORDER BY record_id) AS rownumber, record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ?) AS A WHERE A.rownumber &lt;= ? AND A.rownumber &gt; ?</recordRetrievalQuery>
        <!--recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ? OFFSET ? ROWS FETCH NEXT ? ROWS ONLY</recordRetrievalQuery-->
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP</query>
            <query>DROP TABLE {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(50), timestamp BIGINT, data VARBINARY(max), partition_key INTEGER, PRIMARY KEY(record_id))</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
    <database name = "PostgreSQL">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} LIMIT 1</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordInsertQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?)</recordInsertQuery>
        <recordUpdateQuery>UPDATE {{TABLE_NAME}} SET partition_key = ?, timestamp = ?, data = ? WHERE record_id = ?</recordUpdateQuery>
        <!--recordMergeQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?) ON CONFLICT DO UPDATE partition_key=VALUES(partition_key), timestamp=VALUES(timestamp), data=VALUES(data)</recordMergeQuery-->
        <forwardOnlyReadEnabled>true</forwardOnlyReadEnabled>
        <fetchSize>1000</fetchSize>
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? OFFSET ? LIMIT ?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}}</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}}</query>
            <query>DROP TABLE IF EXISTS {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(50), timestamp BIGINT, data BYTEA, partition_key INTEGER, PRIMARY KEY(record_id))</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>
    <database name = "DB2.*">
        <recordCountSupported>false</recordCountSupported>
        <paginationSupported>true</paginationSupported>
        <paginationMode>MODE1</paginationMode>
        <blobLengthRequired>true</blobLengthRequired>
        <recordTableCheckQuery>SELECT 1 FROM {{TABLE_NAME}} FETCH FIRST 1 ROWS ONLY</recordTableCheckQuery>
        <recordCountQuery>SELECT COUNT(*) FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordCountQuery>
        <recordDeletionQuery>DELETE FROM {{TABLE_NAME}} WHERE timestamp &gt;= ? AND timestamp &lt; ?</recordDeletionQuery>
        <recordDeletionWithIdsQuery>DELETE FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordDeletionWithIdsQuery>
        <recordMergeQuery>MERGE INTO {{TABLE_NAME}} AS dest USING (VALUES(?, ?, ?, ?)) AS src (partition_key, timestamp, data, record_id) ON dest.record_id = src.record_id WHEN MATCHED THEN UPDATE SET dest.partition_key = src.partition_key, dest.timestamp = src.timestamp, dest.data = src.data WHEN NOT MATCHED THEN INSERT (partition_key, timestamp, data, record_id) VALUES (src.partition_key, src.timestamp, src.data, src.record_id)</recordMergeQuery>
        <recordInsertQuery>INSERT INTO {{TABLE_NAME}} (partition_key, timestamp, data, record_id) VALUES (?, ?, ?, ?)</recordInsertQuery>
        <recordUpdateQuery>UPDATE {{TABLE_NAME}} SET partition_key = ?, timestamp = ?, data = ? WHERE record_id = ?</recordUpdateQuery>
        <recordRetrievalQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE partition_key &gt;= ? and partition_key &lt; ? AND timestamp &gt;= ? AND timestamp &lt; ? LIMIT ?,?</recordRetrievalQuery>
        <recordRetrievalWithIdsQuery>SELECT record_id, timestamp, data FROM {{TABLE_NAME}} WHERE record_id IN ({{RECORD_IDS}})</recordRetrievalWithIdsQuery>
        <recordTableDeleteQueries>
            <query>DROP INDEX {{TABLE_NAME}}_PARTITION_KEY</query>
            <query>DROP INDEX {{TABLE_NAME}}_TIMESTAMP</query>
            <query>DROP TABLE {{TABLE_NAME}}</query>
        </recordTableDeleteQueries>
        <recordTableInitQueries>
            <query>CREATE TABLE {{TABLE_NAME}} (record_id VARCHAR(50) NOT NULL, timestamp BIGINT, data BLOB(2G) NOT LOGGED, partition_key INTEGER, PRIMARY KEY(record_id))</query>
            <query>CREATE INDEX {{TABLE_NAME}}_TIMESTAMP ON {{TABLE_NAME}} (timestamp)</query>
            <query>CREATE INDEX {{TABLE_NAME}}_PARTITION_KEY ON {{TABLE_NAME}} (partition_key)</query>
        </recordTableInitQueries>
    </database>

 

The above configuration properties are described below.

PropertyDescription
database name
The target RDBMS name to which this query template applies, a regular expression can be put here, to give a pattern on to which a database product name can be mapped. For example, DB2 will give different product name strings when running in Windows and Unix based OS environments, so a regular expression like "DB2.*" will match for all DB2 based database server environments.
minVersion
The minumum version of the database server this configuration will match to, this will be of format "majorVersion.minorVersion".
maxVersion
The maximum version of the database server this configuration will match to, this will be of format "majorVersion.minorVersion".
recordCountSupportedThis property specifies whether it is possible to take a count of all the records in the record store.
paginationSupportedThis property specifies whether dividing the output of the record store to manageable chunks is allowed.
paginationModeThis property specifies the pagination mode. Possible values are as follows.
blobLengthRequiredThis property specifies whether a length should be assigned to data blocks saved in the record store or not.
recordCountQueryThe query template to take a count of the records in the record store.
recordDeletionQueryThe query template to delete a record in the record store.
recordDeletionWithIdsQueryThe query template to delete records with specific IDs in the record store.
recordMergeQueryThe query template to merge two rows of data of a table in the record store.
forwardOnlyReadEnabledIf this property is set to true, the cursor can only move forward on the result set when retrieving records from record store.
fetchSizeNumber of rows that should be fetched from the database if more rows are needed on the generated result set when retrieving records from record store.
recordInsertQueryThe query template to insert new rows of data to a table in the record store.
recordUpdateQueryThe query template to modify the existing table rows in the record store.
recordRetrievalQueryThe query template to retrieve a record from the record store.
recordRetrievalWithIdsQueryThe query template to retrieve records with specific IDs from the record store.
recordTableCheckQueryThe query template to check tables in the record store.
recordTableDeleteQueriesThe query template to delete a table in the record store.
recordTableInitQueriesThe query template to initialize the tables in the record store.

Configuring the datasources

Change the configurations of the WSO2_ANALYTICS_EVENT_STORE_DB datasource and the WSO2_ANALYTICS_PROCESSED_DATA_STORE_DB datasource in the <DAS_HOME>/repository/conf/datasources/analytics-datasources.xml file accordingly. For information on the datasource configurations, see Configuring an RDBMS Datasource.

Note that the <defaultAutoCommit> configuration in the analytics-datasources.xml file (for RDBMS datasources) is not applicable for WSO2 DAS. This is because auto committing is handled at the code level for the product, i.e., the default auto commit configuration specified for the RDBMS driver will be effective. Typically, auto committing is enabled for RDBMS drivers by default.

When auto committing is enabled, each SQL statement will be committed to the database as an individual transaction, as opposed to committing multiple statements as a single transaction.

  • No labels