The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The properties of the above event stream definition are described below.

PropertyDescription
Event Stream NameName of the event stream.
Event Stream VersionVersion of the event stream. (Default value is 1.0.0.)
Event Stream DescriptionDescription of the events stream. (This is optional.)
Event Stream Nick-NameNick-names of an event streams separated by commas.(This is optional.)
Stream Attributes

Stream attributes contains the data of the event. Data is divided into the following 3 logical categories for maintenance and usability. It is not required to

to

have attributes for all 3 categories, but there should be at least one category with at least one attribute defined. The attribute names should be unique within each category.

  • Meta Data: This refers to meta information of the events (e.g., timestamp, host, IP, etc.). They are received with the meta tag (i.e., in the meta_<attribute name> format).
  • Correlation Data: This refers to information that is used to correlate multiple events (e.g., correlation_id,temporal_id etc.). They are received with the correlation tag (i.e., in the correlation_<attribute name> format).
  • Payload Data: Contains the actual data that the event intends to have. (Referred to as <attribute name>.)

e.g., The following attributes exist in a single event.

  • event_timestamp
  • request_IP_address
  • correlation_Id
  • price
  • symbol

These attributes can be logically categorized as follows.

CategoryAttributesNotes
Meta Data
  • event_timestamp
  • request_IP_address
These attributes provide information about the events themselves.
Correlation Data
  • correlation_Id
This attribute correlates the event with other events from other streams, and it is useful when performing join operations on a stream.
Payload Data
  • price
  • symbol
These are information provided via the event.
Info
  • Note that the internal system does not differentiate the attributes based on this categorization. However, it is recommended to categorize the attributes for the purpose of organizing them in a logical manner within the stream definition.
  • If you edit an attribute in an event stream or add a new attribute to it, other artifacts that are associated with the stream will be inactive. Therefore, it is recommended to redefine the stream with a new version including the additional/changed attribute.

Adding an event stream 
Anchor
Add
Add

...

Excerpt

WSO2 CEP/DAS facilitates the following default and custom event formats.

Default event formats
Anchor
Default
Default

By default, WSO2 CEP/DAS represents an event as a WSO2Event object. Furthermore, WSO2 CEP/DAS supports events in XML, JSON, Text and Map formats. The default event formats of the XML, JSON, Text and Map representations for the following sample event stream definition are as follows.

Sample event stream definition
Code Block
languagejs
{
  "streamId": "org.wso2.test:1.0.0",
  "name": "org.wso2.test",
  "version": "1.0.0",
  "nickName": "TestStream",
  "description": "Test Stream",
  "metaData": [
    {
      "name": "ip",
      "type": "STRING"
    }
  ],
  "correlationData": [
    {
      "name": "id",
      "type": "LONG"
    }
  ],
  "payloadData": [
    {
      "name": "testMessage",
      "type": "STRING"
    }
  ]
}
Default XML format
Code Block
<events>
    <event>
        <metaData>
            <ip>data4</ip>
        </metaData>
        <correlationData>
            <id>56783</id>
        </correlationData>
        <payloadData>
            <testMessage>data1</testMessage>
        </payloadData>
    </event>
</events>
Default JSON format
Code Block
{
    "event": {
        "metaData": {
            "ip": "data4"
        },
        "correlationData": {
            "id": "545455"
        },
        "payloadData": {
            "testMessage": "data1"
        }
    }
}
Default text format
Code Block
languagetext
meta_ip:data1,
correlation_id:323232,
testMessage:data2
Default map format
KeyValue

meta_ip

data1

correlation_id

323232

testMessage

data2

Custom event formats
Anchor
Custom
Custom

If you receive and publish events with a different format than the default format, you need to provide appropriate mappings for the system to interpret the events.

...

Code Block
languagesql
@from(eventtable = 'analytics.table' , table.name = <analytics_table_name>, primary.keys = <primary_keys>, indices = <indices>, wait.for.indexing = <wait_for_indexing_flag>, merge.schema = <merge_schema_flag>)
define table <EventTableName> (<schema>);
Field NameDescriptionRequiredDefault Value
table.nameThe name of the analytics table, this can be an existing table, or else, a new one will be created.Yes
 

primary.keys

The list of fields to be used as the primary keys of the table, this can be useful, if the lookup operations are done only using primary key values,

which is the most efficient to execute.

No
 

indices

The list of index fields separated by commas, each entry consists of the format "<index_column_name> -sp", where "-sp" is optional property to say,

if this index column should be treated as a score param.

No
 

wait.for.indexing

The indexing operations in the analytics tables happens asynchronously, if events coming from a specific stream changing the event table's data needs

to be finalized, setting this flag to 'true' would wait till the background indexing to finish and continue with the execution of the flow.

Notrue
merge.schema

In the case of an existing table given to the analytics event table, if this flag is set to 'true', the existing schema and the given schema will be merged together.

That is, the merging of columns and its indexing information, if set to 'false', the schema given by the analytics event table will overwrite the existing one.

Notrue
caching

Enables caching for the analytics table. This will store any looked up data from the analytics table and store the most recently accessed data,

with the given capacity and timeout restrictions of the cache.

Nofalse
cache.timeout.secondsThe timeout of the cache entries in secondsNo10
cache.size.bytesThe maximum capacity of the cache in bytesNo10485760
Code Block
languagesql
titleSample
@from(eventtable = 'analytics.table' , table.name = 'stocks', primary.keys = 'symbol', indices = 'price, volume -sp', wait.for.indexing = 'true', merge.schema = 'false')
define table StockTable (symbol string, price float, volume long);