This documentation is for WSO2 Stream Processor 4.0.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Event streams can be persisted to store data in event tables of different types. When writing Siddhi queries, you can make references to data stored in event tables (i.e., via joins etc). This section explains how to define event tables and the operations that can be performed on them.

General prerequisites

In order to create and use an event table to store data, the following should be completed:

  • The required database (MySql, MongoDB, Oracle Database, etc) should be downloaded and installed. 
  • A database instance should be started.
  • The user IDs used to perform the required table operations should be granted the relevant privileges.
  • The relevant JDBC Driver must be downloaded and the jar must be put in the  <SP_HOME>/lib  directory.

Defining a table

This section explains how to configure an event table in which events can be stored. Click on the relevant tab to view information for the required database type.

Prerequisites

Before defining a table, you need to complete the General prerequisites.

Query syntax

The following is the query syntax to define a MongoDB event table.

@Store(type="mongodb", mongodb.uri="<MONGODB CONNECTION URI>")
@PrimaryKey("ATTRIBUTE_NAME")
@IndexBy("<ATTRIBUTE_NAME> <SORTING ORDER> <INDEX OPTIONS>")
define table <TABLE_NME> (<ATTRIBUTE1_NAME> <ATTRIBUTE1_TYPE>, <ATTRIBUTE2_NAME> <ATTRIBUTE2_TYPE>, <ATTRIBUTE3_NAME> <ATTRIBUTE3_TYPE>, ...);

The mongodb.uri parameter specifies the URI via which MongoDB user store is accessed.

In addition, the following annotations are used in the MongoDB definition.

  • @primary : This specifies a list of comma-separated values to be treated as unique fields in the table. Each record in the table must have a unique combination of values for the fields specified here.
  • @index : This specifies the fields that must be indexed at the database level. You can specify multiple values as a comma separated list.

Example

The following query defines a MongoDB table named FooTable with the symbolprice, and volume attributes. The symbol attribute is considered the primary key and it is also indexed.

@Store(type="mongodb", mongodb.uri="mongodb://admin:admin@localhost:27017/Foo?ssl=true")
@PrimaryKey("symbol")
@IndexBy("symbol 1 {background:true}")
define table FooTable (symbol string, price float, volume long);

Prerequisites

Before defining a table, you need to complete the General prerequisites.

Query syntax

The following is the syntax for an RDBMS event table configuration:

@store(type="rdbms", jdbc.url="<jdbc.url>", username="<username>", password="<password>",pool.properties="<prop1>:<val1>,<prop2>:<val2>")
@PrimaryKey("col1")
@IndexBy("col3")
define table <table_name>(col1 datatype1, col2 datatype2, col3 datatype3);
Parameters

The following parameters are configured in the definition of an RDBMS event table.

ParameterDescriptionRequired/Optional
jdbc.url The JDBC URL via which the RDBMS data store is accessed.Required
username The username to be used to access the RDBMS data store.Required
password The password to be used to access the RDBMS data store.Required
pool.properties Any pool parameters for the database connection must be specified as key value pairs.Required
jndi.resource The name of the JNDI resource through which the connection is attempted. If this is found, the pool properties described above are not taken into account.Optional
table.name The name of the RDBMS table created.Optional
field.length The number of characters that the values for fields of the STRING type in the table definition must contain. If this is not specified, the default number of characters specific to the database type is considered.Optional

In addition to the above parameters, you can add the @primary and @index annotations in the RDBMS table configuration.

  • @primary : This specifies a list of comma-separated values to be treated as unique fields in the table. Each record in the table must have a unique combination of values for the fields specified here.
  • @index : This specifies the fields that must be indexed at the database level. You can specify multiple values as a come-separated list.

Example

The following is an example of an RDBMS table definition:

@Store(type="rdbms", jdbc.url="jdbc:h2:repository/database/ANALYTICS_EVENT_STORE", username="root", password="root",field.length="symbol:254")
@PrimaryKey("symbol")
define table FooTable (symbol string, price float, volume long);

Prerequisites

Before defining a table, you need to complete the General prerequisites.

Query syntax

The query syntax to define an SOLR table is as follows.

@PrimaryKey("id")
@store(type=“solr”, url=<solr-cloud-zookeeper-url>, collection=<solr-collection-name>, base.config=<config-name>, shards=<no-of-shards>, replicas=<no-of-replicas>, schema=<schema-definition>, commit.async=true|false)
define table Footable (time long, date string);
Parameters

The following parameters are configured in an SOLR table definition.

ParameterDescriptionRequired/Optional
collectionThe name of the solr collection/table.Required
urlThe URL of the zookeeper master of SOLR cloud.Required

base.config

The default configuration that should be used for the SOLR schema.Optional
shardsThe number of shards.Optional
replicaThe number of replica.Optional
schemaThe SOLR schema definition.Optional
commit.async

If this is set to true, the results all the operations carried out for the table (described below) are applied at a specified time interval. If this is set to false, the results of the operations are applied soon after they are performed with the vent arrival.

e.g., If this is set to false, an event selected to be inserted into the table is inserted as soon as it arrives to the event stream.

N/A


Example

This query defines an SOLR table named FooTable in which a schema that consists of the two attributes time (of long type) and date (of the string type) is maintained. The values for both attributes are stored.

@store(type='solr', url='localhost:9983', collection='TEST1', base.config='gettingstarted', " +
"shards='2', replicas='2', schema='time long stored, date string stored', commit.async='true') " +
"define table Footable(time long, date string);

Prerequisites

Before defining a table, you need to complete the General prerequisites.

Query syntax

The query syntax to define an HBase table is as follows.

@Store(type="hbase", any.hbase.property="<STRING>", table.name="<STRING>", column.family.name="<STRING>")
@PrimaryKey("PRIMARY_KEY")
@Index("INDEX")

In order to make it possible to delete any record from an HBase store, the store definition needs to include at least one primary key.


Parameters

The following parameters are configured in the definition of an HBase event table:

ParameterDescriptionRequired/Optional

table.name

The name with which the table should be persisted in the store. If no table name is specified, the table in the store is assigned the same name as the corresponding Siddhi table.

Optional
column.family.nameThe name of the HBase column family from which data must be stored/referred to.Required
any.hbase.property

Any property that can be specified for HBase connectivity in hbase-site.xml is also accepted by the HBase Store implementation. The most frequently used properties are...

hbase.zookeeper.quorum - The hostname of the server in which the zookeeper node is run.

hbase.zookeeper.property.clientPort - The port of the zookeeper node.

Required

In addition, the following annotations are used in the HBase definition.

  • @primary : This specifies a list of comma-separated values to be treated as unique fields in the table. Each record in the table must have a unique combination of values for the fields specified here. Primary keys allow you to fetch records in a table by providing a unique reference for each record. Therefore, if you do not include one or more primary keys in the table definition, it is not possible to perform table operations such as searching, updating, deleting and joining.
  • @index : This specifies the fields that must be indexed at the database level. You can specify multiple values as a comma separated list.

Example

@Store(type=”hbase”, table.name=”SomeTestTable”, column.family.name=”SomeCF”, hbase.zookeeper.quorum=”localhost”, hbase.zookeeper.property.clientPort=”2181”)
@PrimaryKey(symbol)
define table FooTable (symbol string, price float, volume long);

Inserting events

Prerequisites

In order to insert events to a table:

  • General prerequisites should be completed.
  • The event stream from which the events to be inserted are taken should be defined.
  • The table to which events are to be inserted should be defined. For more information, see Defining a table.

Query syntax

The following is the syntax to insert events into a table.

from <STREAM_NAME>
select <ATTRIBUTE1_NAME>, <ATTRIBUTE2_NAME>, <ATTRIBUTE3_NAME> ...
insert into <TABLE_NAME>;


Example

The following query inserts events from the FooStream stream to the FooTable table with the symbolprice, and volume attributes.

from FooStream
select symbol, price, volume
insert into FooTable;


Retrieving events

Prerequisites

In order to retrieve events from a table:

Query syntax

The following is the query syntax to retrieve events from an existing table. For more information, please refer to Siddhi Query Guide - Join Table.

from <STREAM_NAME> join <TABLE_NAME>
    on <CONDITION>
select (<STREAM_NAME>|<TABLE_NAME>).<ATTRIBUTE1_NAME>, (<STREAM_NAME>|<TABLE_NAME>).<ATTRIBUTE2_NAME>, ...
insert into <OUTPUT_STREAM>

Example

The following query joins the FooStream events with the events stored in the StockTable table. An output event is created for each matching pair of events, and it is inserted into another stream named OutputStream

from FooStream#window.length(1) join StockTable
select FooStream.symbol as checkSymbol, StockTable.symbol as symbol, StockTable.volume as volume 
insert into OutputStream

The information inserted with the output event is as follows.

SourceValue ofAttribute name in the output event
FooStream streamsymbol attribute

checkSymbol

StockTable tablesymbol attributesymbol
StockTable tablevolume attributevolume

Updating a table

This section explains how to update the selected records of an existing table.

Prerequisites

In order to update events in a table:

  • General prerequisites should be completed.
  • The table to be updated should be already defined. For more information, see Defining a table.
  • The event stream with the events based on which the updates are made must be already defined.
  • One or more events should be inserted into the table. For more information, see Inserting events.

Query syntax

from <STREAM_NAME> 
select <ATTRIBUTE1_NAME>, <ATTRIBUTE2_NAME>, ...
update <TABLE_NAME> (for <OUTPUT_EVENT_TYPE>)? 
    set <TABLE_NAME>.<ATTRIBUTE_NAME> = (<ATTRIBUTE_NAME>|<EXPRESSION>)?, <TABLE_NAME>.<ATTRIBUTE_NAME> = (<ATTRIBUTE_NAME>|<EXPRESSION>)?, ...
    on <CONDITION>

Example

The following query updates the events in the FooTable table with values from the latest events of the FooStream event stream. The events in the table are updated only if the existing record in the table has the same value as the new event for the symbol attribute, and a value greater than 50 for the price attribute. 

from FooStream
select symbol, price, volume
update FooTable 
set FooTable.symbol = symbol, FooTable.price = price, FooTable.volume = volume 
on (FooTable.symbol == symbol and price > 50)

Methods of Updating the columns in a table

This section gives further information on methods of updating the columns in an existing table.

The value used for updating a table column can be any of the following:

  • A constant

    FROM sensorStream
    SELECT sensorId, temperature, humidity
    UPDATE sensorTable
        SET sensorTable.column_temp = 1 
        ON  sensorId < sensorTable.column_ID
  • One of the attributes specified in the SELECT clause

    FROM fooStream
    SELECT roomNo, time: timestampInMilliseconds() as ts
    UPDATE barTable
        SET barTable.timestamp = ts
        ON  barTable.room_no == roomNo AND roomNo > 2
  • A basic arithmetic operation applied on an output attribute

    FROM sensorStream
    SELECT sensorId, temperature, humidity
    UPDATE sensorTable
        SET sensorTable.column_temp = temperature + 10  
        ON  sensorId < sensorTable.column_ID
  • A basic arithmetic operation applied to a column value in the event table

    FROM sensorStream
    SELECT sensorId, temperature, humidity
    UPDATE sensorTable
        SET sensorTable.column_temp = sensorTable.column_temp + 10  
        ON  sensorId < sensorTable.column_ID

Deleting records

This section explains how to delete existing records in a table based on a specific condition.

Prerequisites

In order to delete selected events in a table:

  • General prerequisites should be completed.
  • The table with the records to be deleted should be already defined. For more information, see Defining a table.
  • The event stream with the events with which the records in the table are compared (i.e., to apply the condition based on which the events are deleted) must be already defined.
  • One or more events should be inserted into the table. For more information, see Inserting events.
  • If the database type is HBase, the store configuration needs to include at least one primary key.

Query syntax

from <STREAM_NAME> 
select <ATTRIBUTE1_NAME>, <ATTRIBUTE2_NAME>, ...
delete <TABLE_NAME> (for <OUTPUT_EVENT_TYPE>)?
    on <CONDITION>

Example

This query deletes the events in the RoomTypeTable table if its value for the roomNo attribute is equal to the roomNumber attribute value of DeleteStream.

from DeleteStream
delete RoomTypeTable
    on RoomTypeTable.roomNo == roomNumber;


Searching records

This section explains how to check whether a specific record exists in an event table.

Prerequisites

In order to search for a record in a table that matches a specific condition:

  • General prerequisites should be completed.
  • The table to be searched should be already defined. For more information, see Defining a table.
  • The event stream with the events with which the records in the table are compared (i.e., to apply the condition based on which the events are searched) must be already defined.
  • One or more events should be inserted into the table. For more information, see Inserting events.

Query syntax

from <STREAM_NAME>[<CONDITION> in <TABLE_NAME>]
select <ATTRIBUTE1_NAME>, <ATTRIBUTE2_NAME>, ...
insert into <OUTPUT_STREAM_NAME>

Example

The following query matches events arriving from the FooStream event stream with the existing recored stored in the StockTable table. If the symbol attribute of an event saved in the table has the same value as the event from the FooStream stream, that event is inserted into the OutputStream stream.

from FooStream[StockTable.symbol==symbol in StockTable]
insert into OutputStream;


Inserting/updating records

This section explains how to update a selection of records in a table based on the new events from a specific event stream. The selection is made based on a specific condition that matches events from the stream with events in the table. When the events from the stream have no matching events in the table, they are inserted into the table as new events.

Prerequisites

  • General prerequisites should be completed.
  • The table for which this operations is to be performed must be already defined. For more information, see Defining a table.
  • The event stream from which the events with which the records in the table are compared (i.e., to apply the condition based on which the events are inserted/updated) must be already defined.

Query syntax

The query syntax to perform the insert/update operation for a table is as follows.

from <STREAM_NAME> 
select <ATTRIBUTE1_NAME>, <ATTRIBUTE2_NAME>, ...
update or insert into <TABLE_NAME> (for <OUTPUT_EVENT_TYPE>)? 
    set <TABLE_NAME>.<ATTRIBUTE_NAME> = <EXPRESSION>, <TABLE_NAME>.<ATTRIBUTE_NAME> = <EXPRESSION>, ...
    on <CONDITION>

Example

This query matches events from the FooStream stream with the events stored in the StockTable table. When an event in the table has the same value for the symbol attribute as the matching new event from the event stream, it is updated based on the new event. If a new event from the event stream does not have a matching event in the table (i.e., an event with the same value for the symbol attribute), that event is inserted as a new event.

from FooStream
select *
update or insert into StockTable
on StockTable.symbol == symbol
  • No labels