This documentation is for WSO2 Stream Processor 4.3.0. View documentation for the latest release.

All docs This doc
Skip to end of metadata
Go to start of metadata

This section is currently under construction!

This section explains how you can upgrade to SP 4.3.0 from SP 4.2.0 or from DAS/CEP. Click on the relevant tab based on the product that you are currently using.

You cannot rollback the upgrade process. However, it is possible to restore a backup of the previous database so that you can restart the upgrade progress.

This section explains how to upgrade to WSO2 SP 4.3.0 from WSO2 SP 4.2.0.

Preparing to upgrade

The following prerequisites should be completed before upgrading.

  • Make a backup of the SP 4.2.0 database and copy the <SP_HOME_4.2.0> directory in order to backup the product configurations.
  • Download WSO2 SP 4.3.0 from here.

Migrating databases

To migrate the databases from SP 4.2.0 to SP 4.3.0, follow the steps below:

  1. To connect WSO2 SP 4.3.0 to the same databases as WSO2 SP 4.2.0 so that the persisted data can be accessed, configure the datasources in <SP_HOME>/conf/<runtime>/deployment.yaml  files similar to how they were configured in WSO2 SP 4.2.0. For the complete list of datasources configured for WSO2 SP, see Configuring Datasources.
  2. To update the aggregation tables as required due to the changes in the Incremental Aggregation fuctionality of SP 4.3.0, run the <SCRIPT_NAME> script. This is can be done by issuing the following commands.

Migrating configurations

It is required to migrate configurations in the <SP_HOME>/conf/<runtime>/deployment.yaml files from SP 4.2.0 to SP 4.3.0. The following are the changes in the configurations included in these files between the two releases:

  • The clustering configurations included under the  #deployment.config:  section, and under  type: ha. For more information about the parameters you need to configure in this section for SP 4.3.0, see Minimum High Availability Deployment - Configuring a minimum HA cluster.

    If you are deployong SP as a minimum HA cluster, note that the mechanism for sending requests to this setup has changed between the two releases. In SP 4.3.0, requests should be sent only to the active node unlike in SP 4.2.0 where you are required to send the requests to both the nodes. For more details about this, see Minimum High Availability Deployment - Publishing events to the cluster.

  • The Siddhi Store Query API in WSO2 SP uses the HTTP/HTTPS transport to allow users to send requests to its endpoint. In SP 4.3.0, a new section named was siddhi.stores.query.api: added in both the <SP_HOME>/conf/worker/deployment.yaml file and the <SP_HOME>/conf/editor/deployment.yaml file. For more information about these parameters, see Managing Stored Data via REST APIs.

In addition to the above, compare the deployment.yaml files for both versions, and check whether you need to do the same changes to the other configuration parameters you may have made in the SP 4.2.0 version in the SP 4.3.0 version as well.

The deployment.yaml files must not be copied directly between servers due to certain differences in the parameters included in the two WSO2 SP versions.

Migrating Siddhi applications

To migrate the Siddhi applications that you have deployed in WSO2 SP 4.2.0, follow the procedure below:

In SP 4.3.0, data stored in tables for incremental aggregation queries are automatically purged by default. If you do not want the data to be purged or if you want to retain the data for a time period longer than the default data purging interval, you need to update the relevant aggregation queries with the required data purging related configurations. For more information, see Incremental Analysis - @purge.

  1. Copy all the Siddhi applications in <SP_HOME_4.2.0>/wso2/worker/deployment/siddhi-files directory.
  2. Place the Siddhi applications you copied to the <SP_HOME_4.3.0>/wso2/worker/deployment/siddhi-files directory.

Migrating dashboards

To migrate the dashboards you were using in SP 4.2.0 with all their contents, follow the procedure below:

  1. Check the datasources configurations that you have added in the <SP_4.2.0_HOME>conf/dashboards/deployment.yaml file for the WSO2_DASHBOARD_DB datasource, and do a database dump.
  2. Import the database dump according to the dashboard datasource configuration that you have added in the <SP_4.3.0_HOME>conf/dashboards/deployment.yaml file.
  3. If you have created any custom widgets in SP 4.2.0, copy the relevant subdirectory for that widget from the <SP_HOME_4.2.0>/wso2/dashboard/deployment/web-ui-apps/portal/extensions/widgets directory, and place it in the <SP_HOME_4.3.0>/wso2/dashboard/deployment/web-ui-apps/portal/extensions/widgets directory.

Testing the migration

Simulate a few events to the Siddhi applications deployed in SP 4.3.0 to test whether they are generating the expected results.

This section provides information on how you can upgrade from DAS/CEP to Product SP. 


WSO2 DAS 3.1.0 is the predecessor of WSO2 SP 4.0.0. Similar to SP, DAS processed events via an event flow that consisted of event streams, receivers, publishers, and execution plans. These elements of the event flow are defined separate from each other via the DAS Management Console.

WSO2 SP defines the complete event flow within a single application created via a Siddhi file. The application is then deployed in a SP worker node and executed at runtime. Due to this architectural difference between the two products, configurations cannot be directly migrated from DAS to SP. Instead, they need to be recreated as explained in this section.

With WSO2 SP's streaming SQL capabilities and its  inbuilt editor that has event simulation and debugging support, it can help you to create real-time applications much faster than before. The WSO2 SP is a new product focusing on solving stream processing and complex event processing use cases. 

WSO2 SP uses a single Siddhi Streaming SQL file for script data collection, processing, and notification logic. The batch analytics aspect is handled via Siddhi aggregations.

For more information about the key capabilities of WSO2 SP, see About This Release.

Deployable Artifacts

Siddhi applications are the deployable artifact type of the Stream Processor.

To use Siddhi, you need to write the processing logic as a Siddhi application in the Siddhi Streaming SQL language.Once a Siddhi application is created and started, it does the following:

  1. Takes data one-by-one as events
  2. Processes the data per each event
  3. Generates new high level events based on the processing carried out up to the current time
  4. Sends newly generated events as the output to streams.

An element of Siddhi SQL can be composed together as a script in a Siddhi application. Here, each construct must be separated by a semicolon ( ; ) as shown in the syntax below.

<siddhi app>  : 
       <app annotation> * 
       ( <stream definition> | <table definition> | ... ) + 
       ( <query> | <partition> ) +

The following is a sample Siddhi application named Temperature-Analytics that includes a stream named TempStream and a query named 5minAvgQuery to process the events handled by it.


define stream TempStream (deviceID long, roomNo int, temp double);

from TempStream#window.time(5 min)
select roomNo, avg(temp) as avgTemp
 group by roomNo
insert into OutputStream;
The following are Siddhi SQL element types in your DAS setup that you can redefine in an Siddhi application so that you can reuse them with WSO2 SP.

Event Streams -> Stream Definition

A stream such as  the TempStream stream unifies common types of events together. This enables them to be processed via queries using their defined attributes in a streaming manner, and allow sinks and sources to map events to/from various data formats.

When migrating event stream definitions in WSO2 DAS, you must rewrite them in the syntax followed in Siddhi applications as illustrated in the table below.

Configuration in WSO2 CEP/DAS
{  "streamId": "TempStream:1.0.0",
  "name": "TempStream",
  "version": "1.0.0",
  "metaData": [
      "name": "ip",
      "type": "STRING"
  "correlationData": [
      "name": "id",
      "type": "LONG"
  "payloadData": [
      "name": "deviceID",
      "type": "LONG"
      "name": "roomNo",
      "type": "int"

{      "name": "temp",
      "type": "DOUBLE"

Configuration in Siddhi file
define stream TempStream (deviceID long, roomNo int, temp double);

Event Reciever -> Source

In WSO2 CEP/DAS, events are received by event receivers that manage the event retrieval process. In Siddhi files deployed in WSO2SP, you need to configure sources instead of event receivers to receive events.

To configure a stream that consumes events via a source, add the source configuration to a stream definition by adding the @source annotation with the required parameter values.

Configuration in WSO2 CEP/DAS
<eventReceiver name="test_event_receiver" statistics="disable"    trace="disable" xmlns="">
    <from eventAdapterType="event_type">
        <property name="static.option.key1">static_option_value1</property>
        <property name="static.option.keyN">static_option_valueN</property>
    <mapping customMapping="disable" type="map_type"/>
    <to streamName="testEventStream" version="1.0.0"/>
Configuration in Siddhi file
@source(type='source_type', static.option.key1='static_option_value1', static.option.keyN='static_option_valueN',
    @map(type='map_type', static.option_key1='static_option_value1', static.option.keyN='static_option_valueN',
        @attributes( attributeN='attribute_mapping_N', attribute1='attribute_mapping_1')
define stream testEventStream (attribute1 Type1, attributeN TypeN);

The type parameter of @source defines the source type that receives events. The other parameters to be configured depends on the source type selected. Some of the the parameters are optional. 

For detailed information about the parameters see the documentation for the relevant source.

The following is the list of source types that are currently supported:

Event Publisher -> Sink

In WSO2 CEP/DAS, events are published via event publishers that manage the event publishing process. In Siddhi files deployed in WSO2SP, you need to configure sinks instead of event publishers to publish events.

To configure a stream that provides events via a sink, add the sink configuration to a stream definition by adding the @sink annotation with the required parameter values.

Configuration in WSO2 CEP/DAS
<eventPublisher name="httpLogger"
  statistics="disable" trace="disable" xmlns="">
  <from streamName="" version="1.0.0"/>
  <mapping customMapping="disable" type="text"/>
  <to eventAdapterType="logger">
    <property name="uniqueId">org.wso2.event.statistics.logger</property>

Configuration in Siddhi file
@Sink(type = 'log', @map(type = 'text'))
define stream sensorStream (sensorId int, temperature double)

For detailed information about the parameters see the documentation for the relevant sink.

The following is a list of currently supported sink types.

Sink and Source Mappers

In WSO2 CEP/DAS server  the supported default format for the Message Format property is configured under Mapping Configuration when creating event receivers. In the Siddhi files, this is replaced with the type parameter of the @map annotation that defines the map type to be used to map the data.

Each @source and @sink configuration has a mapping denoted by the @map annotation that converts the incoming messages format to Siddhi events.

Configuration in WSO2 CEP/DAS
<mapping customMapping="disable" type="map_type"/>
Configuration in Siddhi file

The other parameters to be configured depends on the mapper selected. Some of these parameters are optional. For detailed information about the parameters see the documentation for the relevant mapper.

The following is a list of currently supported source mapping types:

Map Attributes

@attributes is an optional annotation used with the @map annotation to define custom mapping that replaces <mapping customMapping> in the CEP/DAS server. When the @attributes annotation is not provided, each mapper assumes that the incoming events adhere to its own default data format. By adding the @attributes annotation, you can configure mappers to extract data from the incoming message selectively, and assign them to attributes.

There are two ways you can configure map attributes.

  1. Defining attributes as keys and mapping content as values in the following format.
    @attributes( attributeN='mapping_N', attribute1='mapping_1')
  2. Defining the mapping content of all attributes in the same order as how the attributes are defined in stream definition.
    @attributes( 'mapping_1', 'mapping_N')


This query receives events via the HTTP source in the JSON data format, and directs them to the InputStream stream for processing. Here the HTTP source is configured to receive events on all network interfaces at the 8080 port and on the foo context. The source is also secured via basic authentication.

Configuration in WSO2 CEP/DAS
<eventReceiver ... xmlns="">    <from ... />
    <mapping customMapping="enable" type="json">
        <from streamName="" version="1.0.6"/>
            <from dataType="meta" name="time"/>
            <to name="meta_timestamp" type="long"/>
            <from dataType="meta" name="meta_ispowerServed"/>
            <to name="meta_isPowerSaverEnabled" type="bool"/>
            <from dataType="meta" name="id"/>
            <to name="meta_sensorId" type="int"/>
    <to ... />
Configuration in Siddhi file
@source(type='http', receiver.url='', is.basic.auth.enabled='true', 
define stream InputStream (name string, age int, country string);

Event Store -> Table

In CEP/DAS, event streams are persisted by creating a corresponding table in the WSO2 Data Access Layer for batch analysis. In WSO2 Stream Processor, this functionality is replaced by Table which is a stored version of an stream or a table of events. Its schema is defined via the table definition that is similar to a stream definition.

These events are by default stored in-memory, but Siddhi also provides store extensions to work with data/events stored in various data stores through the table abstraction. 

Tables allow Siddhi to work with stored events. By defining a schema for tables, Siddhi allows them to be processed by queries using their defined attributes with the streaming data. You can also interactively query the state of the stored events in the table.

define table StockTable (symbol string, price float, volume long);


Indexes allow tables to be searched/modified much faster.

Indexes are configured by including the @Index( 'key1', 'key2' ) annotation to the table definition. Each event table configuration can have 0-1 @Index annotations. Support for the @Index annotation and the number of attributes supported differ based on the table implementations. When more then one attribute is used for index, each one of them is used to index the table for fast access of data. Indexes can be configured together with primary keys.


This query creates an indexed event table named RoomTypeTable with the roomNo attribute as the index key.

define table RoomTypeTable (roomNo int, type string);

Execution Plan -> Queries

Queries used in DAS/CEP execution plans and Stream Processor are almost same. There are a few newly introduced features for Siddhi 4.0.0 that are used with WSO2 Stream Processor. These features are listed below.

  • I ncremental Aggregation

    Incremental aggregation allows user to retrieve the aggregate value for different time durations. That is, it allows user to obtain aggregates such as sumcountavgminmax, and count) of stream attributes for durations such as secminhour, etc. 

    Following is an example query.

    define stream TradeStream (symbol string, price double, volume long, timestamp long);
    define aggregation TradeAggregation
      from TradeStream
      select symbol, avg(price) as avgPrice, sum(price) as total
        group by symbol
        aggregate by timestamp every sec ... year;
  • Set key word

    Set keyword allows you to update selected attributes from the table.

    Here, for each assignment, the attribute specified in the left must be the table attribute, and the one specified in the right can be a stream/table attribute a mathematical operation, or other. 

    When the set clause is not provided, all the attributes in the table are updated. This works with the update and update or insert operations.

    The following is a sample query.

    FROM fooStream
    SELECT roomNo, time: timestampInMilliseconds () as ts
    UPDATE barTable
        SET barTable.timestamp = ts
        ON  barTable.room_no == roomNo AND roomNo > 2
  • Pattern to identify non-occurence of events

    Patterns and sequences are the key features of a complex event processor to define a new complex event based on the order of one or more raw events. A pattern can be an atomic pattern that detects the arrival of a specific event, or a complex pattern that detects the arrival of more events in a defined order. Although patterns generally define complex events based on the order of events that arrive, sometimes a complex event may depend on an event that should not arrive.

    Usually, Siddhi pattern processors wait for the events until they arrive. Once an event arrives, the pattern processor starts looking for the next event. When detecting events that have not arrived, the pattern processor must not wait for an infinite time period to declare the non-arrival of the event. Therefore, a time interval to wait for the event must be defined with absent pattern operators with an exception to the logical AND pattern combining an event that is expected to arrive, and an event that must not arrive beforehand.

    Following is a sample query.

    from TemperatureStream[temp > 60] -> not FireAlarmStream[active == true] for 5 sec
    select 'Fire alarm not working' as message
    insert into AlertStream;
  • Defined Window

    A defined window is a window that can be shared across multiple queries. 

    Events can be inserted to a defined window from one or more queries and it can produce output events based on the defined window type.

    The following is a sample query.

    define stream TempStream(tempId string, temp double);
    define window OneMinTempWindow(tempId string, temp double) time(1 min);
    from TempStream
    select *
    insert into OneMinTempWindow;

Upgrading the database

WSO2 SP stores product-specific data in H2 databases by default. Those databases are located in the <PRODUCT_HOME>/wso2/<Profile>/database directory.

This embedded H2 database is suitable for development, testing, and for some production environments. However, we recommend that you use an industry-standard RDBMS such as Oracle, PostgreSQL, MySQL, MS SQL, etc., because they are more suitable for most production environments. Most table schemas are self generated by the feature itself. For the other table schemas you can use the scripts provided with WSO2 SP in the <SP_HOME>/wso2/<Profile>/dbscripts directory to install and configure databases. This directory includes scripts for Oracle, PostgreSQL, MySQL and MS SQL. 

CEP database migration is direct as it only uses RDBMS tables without any properietary encoding.

DAS supports data persistance in two types of databases.

  1. RDBMS Event tables

  2. Analytics Tables (which also include Analytics Event Tables)

RDBMS Event table migration is straightforward because WSO2 SP suppports RDBMS. 

Analytics tables migration is an indirect process where we need to convert the Analytics Tables into RDBMS tables by running Spark scripts using the CarbonJDBC provider packed with DAS. 

The following is an sample query that can be run on DAS to migrates the ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA event table into  an RDBMS table.

USING CarbonAnalytics OPTIONS (tableName "ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA", schema "house_id INT, metro_area STRING, state STRING, device_id INT, power_reading FLOAT, is_peak BOOLEAN");

metro_area STRING, state STRING, device_id INTEGER, power_reading FLOAT, is_peak BOOLEAN");

INSERT INTO TABLE SMART_HOME_DATA_RDBMS SELECT * FROM SMART_HOME_DATA;                                           
  • No labels