This documentation is for WSO2 Stream Processor 4.4.0 (the latest version of WSO2 SP. View documentation for the Streaming Integrator, the successor of WSO2 SP.

All docs This doc
Skip to end of metadata
Go to start of metadata

The recommended deployment for WSO2 Stream Processor is the Minimum HA Deploment. However, that deployment pattern involves using only two nodes and it is not scalable beyond that. If you want to configure WSO2 Stream Processor as a scalable deployment, you can use the Active-Active deployment pattern. For an overview of the Active-Active deployment pattern and instructions to configure it, see the following topics.


The above diagram represents a deployment where you are not limited to two nodes. You can scale the event processing horizontally by adding more  Stream Processor Worker nodes to the deployment. In this deployment, it is recommended to configure the client application to publish events to multiple SP worker nodes in a Round Robin manner to ensure better fault tolerance. The publishing of events can be carried out by one or more clients.

In order to perform aggregations in a distributed manner and achieve the scalability, this setup uses distributed aggregations.

Distributed aggregations partially process aggregations in different nodes. This allows you to assign one node to process only a part of an aggregation (regional scaling, etc.). In order to do this all the aggregations must have a physical database and must be linked to the same database.

Partitioning aggregations can be enabled at aggregation level and also at a global level. To enable it at the global level, add the following section with the @PartitionById annotation set to true in the <SP_WORKER_HOME>/conf/worker/deployment.yaml file.

   partitionById: true
   shardId: wso2-sp

If you want to enable for a specific aggregation then the @PartitionById annotation must be added before the aggregation definition as shown in the example below.


To understand an active-active cluster processes aggregations when aggregations are partitioned and assigned to different nodes, consider the following Siddhi query. To learn more about Siddhi queries, see Siddhi Query Guide.

define stream TradeStream (symbol string, price double, quantity long, ;timestamp long);
@store(type='rdbms',jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root", password="root" ,"com.mysql.jdbc.Driver")
define aggregation TradeAggregation
from TradeStream
select symbol, avg(price) as avgPrice, sum(quantity) as total
group by symbol
aggregate by timestamp every sec ... year

This query captures the information relating to a trade. Each transaction represents an event, and the information captured includes the symbol of the product, the price at which it is sold, the quantity sold during the transaction, and the timestamp of the transaction. Each node stores this information in the TEST_DB data store defined in the <SP_WORKER_HOME>/conf/worker/deployment.yaml file.

Now let's assume that the following input events were generated for the two nodes during a specific hour.

Node 1


Node 2


Here, node 1 calculates an hourly total of 30, and node 2 calculates an hourly total of 40. When you retrieve the total for this hour via a retrieval query, the result is 70.

The steps to enable aggregation partitioning are provided under Configuring an active-active cluster.

Configuring an active-active cluster

To configure the Stream Processor worker nodes to deploy them as an active-active cluster, edit the <SP_WORKER_HOME>/conf/worker/deployment.yaml file as follows:

Before you begin:

  • Download two binary packs of WSO2 Stream Processor.
  • Set up a working RDBMS instance to be used by the WSO2 Stream Processor cluster.

  1. For each node, enter a unique ID for the id property under the wso2.carbon section. This is used to identify each node within a cluster. For example, you can add IDs as shown below.
    • For node 1:

        id: wso2-sp-1
    • For node 2:

        id: wso2-sp-2
  2. Enable partitioning aggregations for each node, and assign a unique shard ID for each node. To do this, set the partitionById and shardId parameters as Siddhi properties as shown below. 

    Assigning shard IDs to nodes allows the system to identify each unique node when assigning parts of the aggregation. If the shard IDs are not assigned, system uses the unique node IDs (defined in step 1) for this purpose.

    • For node 1:

          partitionById: true
          shardId: wso2-sp-1
    • For node 2:

          partitionById: true
          shardId: wso2-sp-2
      • To maintain data consistency, do not change the shard IDs after the first configuration
      • When you enable the aggregation partitioning feature, a new column ID named SHARD_ID is introduced to the aggregation tables. Therefore, you need to do one of the following options after enabling this feature to avoid errors occuring due to the differences in the table schema.
        • Delete all the aggregation tables for SECONDSMINUTESHOURSDAYSMONTHSYEARS
        • Edit the aggregation tables by adding a new column named SHARD_ID, and add that to the existing primary key list of the table.
  3. Configure a database, and then update the default configuration for the TEST_DB data source with parameter values suitable for your requirements

As explained in above the events are processed in multiple active nodes. Eventhough this is usually a stateful operation, you can overcome the node-dependent calculations via distributed aggregation. This allows WSO2 Stream Processor to execute scripts that depend on incremental distributed aggregation.

However, an active-active deployment can affect alerts because alerts also depend on some in-memory stateful operations such as windows. Due to this, alerts can be generated based on the events received by specific node. Thus the alerts are node-dependent, and you need to disable them to run scripts with distributed incremental aggregation.

  • No labels