This documentation is for WSO2 Stream Processor 4.3.0. View documentation for the latest release.

All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

WSO2 Stream Processor has a component named Dashboard in the User Interface and Dashboard layer. The Dashboard allows users to view the output of  analytics analytics in an interactive manner. It also conveys observability information the cluster, the status of the list of the applications (i.e., Siddhi applications) currently submitted, and the status of each Stream Processor node. The JVM metrics, as well as Siddhi application level metrics, can be viewed through this dashboard.

...

Job Manager nodes handle all the Management layer related functionalities. This layer contains two WSO2 SP Manager instances configured to run in the highly high availability mode. Here, the Manager parses the distributed Siddhi application provided by the user, partitions it into multiple Siddhi applications, wires them using Kafka topics, and deploys them in the available worker nodes. Management layer also handles the effects of the worker nodes joining/leaving the distributed cluster by re-distributing the the Siddhi applications accordingly.


The processing layer (also known as the eesource resource cluster) is represented by multiple WSO2 SP Worker instances that are configured as workers. Each WSO2 SP worker instance in this layer registers itself to the Manager Cluster when it starts. These workers periodically send their heartbeats to the Manager Cluster. This allows the Managers to identify the active worker nodes and the inactive ones. The worker nodes (resource nodes) run the Siddhi applications assigned to them by their Manager nodes. In addition, they are also capable of handling network partitions in a graceful manner as depicted in the following diagram. 


As depicted above, a worker node periodically synchronizes its configurations and the Siddhi applications with the manager Node. If the network get gets partitioned or if the manager becomes unreachable, it undeploys the applications deployed in it. By doing so, it allows the Siddhi applications to be rescheduled in other work nodes that are maintaining their connections with the manager nodes.

It is required to use Apache Kafka and Apache Zookeeper to configure a fully distributed SP cluster. As explained above, a Kafka cluster which is a part of the Persistence layer and the Transport layer holds all the topics that are used for communication by distributed Siddhi applications. Persistence stores of the Persistence layer can be RDBMs RDBMS databases that store both configuration and system state data. Identity and access management of all the WSO2 Stream Processor nodes are handled by any OAuth Identity provider such as of the WSO2 Identity and Access Management(WSO2 IAM).

There are no restrictions to run WSO2 Stream Processor in the distributed mode on any environment. It can run in the distributed mode on bare metal, VMs, and containers. Here the maanager manager nodes are grouped in a single cluster backed by a database for correlation. Similarly, dashboard nodes can also be deployed in a separate cluster. The worker nodes, on the other hand, are not aware of each other. They are synchronized with manager nodes from which they receive instructions. 

...

The manager cluster contains two or more WSO2 SP instances configured to run in the high availability mode. The manager cluster is responsible for parsing a user-defined distributed Siddhi application, dividing it to multiple Siddhi applications, creating the required topics and then deploying them in the available resource nodes. The manager cluster also handles resource nodes that join/leave the distributed cluster, and re-schedules the Siddhi applications accordingly. Since manager nodes are deployed in a high availability mode, if and when the active manger manager node goes down, another node in the manger manager will be elected as the cluster to handle the resource cluster.

...

A resource cluster contains multiple WSO2 SP instances. Each instance sends a periodic heartbeat to the manager cluster so that the managers at any given time can identify the resource nodes that are active in the cluster. The resource nodes are responsible for running Siddhi applications assigned to them by the manager nodes. A resource node continues to run its Siddhi applications until a manager node undeploys them, or until it is no longer able to reach a manager node to send its heartbeat. If a manager node is unreachable for a specified amount of time, the resource node stops operating, removes its deployed Siddhi applications and waits until it can reach a manager node again.

Deployed Siddhi applications communicate among themselves via Kafka topics.

Kafka cluster

A Kafka cluster holds all the topics used by distributed Siddhi applications. All communications between execution groups take place via Kafka. You can only publish and receive data from distributed Siddhi applications via Kafka. In order to do so, you can either define a Kafka source in the initial distributed Siddhi application or use the Kafka source created by distributed implementation. Note that installing Kafka and Zookeeper is a prerequisite to configure a distributed deployment.

Configuring a distributed cluster

This section explains how to configure a distributed WSO2 SP cluster.

Prerequisites

In order to configure a fully distributed HA cluster, the following prerequisites must be completed:

...

The resource cluster can include both receiver workers and resource workers. You can specify the minimum number of receiver worker nodes to be included. However, you need to ensure that the minimum number specified is greater than one. This is because, if one or more distributed Siddhi applications contain a user-defined source such as HTTP or Thrift, then that Siddhi application cannot be deployed in a resource worker node. Therefore, at least one receiver worker node needs to be available in the resource cluster to ensure that distributed Siddhi applications are successfully deployed.

Deployed Siddhi applications communicate among themselves via Kafka topics.

Kafka cluster

Info

It is required to insrtall Kafka and Zookeeper to configure a fully distributed deployment.


A Kafka cluster holds all the topics used by distributed Siddhi applications. All communications between execution groups take place via Kafka.

Publishing and receiving data from distributed Siddhi applications can be done via Kafka or other Siddhi sources as follows:

  • Via Kafka
    To use Kafka for publishing and receiving data, you can either define a Kafka source in the initial distributed Siddhi application or use the Kafka source created by the distributed implementation. 
  • Via Other Siddhi Sources
    This invoves definingh the source in the initial distributed Siddhi application.

    Info

    At least one receiver worker node should be configured in the cluster.

Configuring a distributed cluster

This section explains how to configure a distributed WSO2 SP cluster.

Prerequisites

In order to configure a fully distributed HA cluster, the following prerequisites must be completed:

  • A WSO2 SP binary pack must be available for each node in the cluster.
  • Each SP node should have an ID under wso2.carbon in the <SP_HOME>/conf/manager/deployment.yaml or <SP_HOME>/conf/worker/deployment.yaml  file depending on the cluster node being configured.
  • A working RDBMS instance to be used for clustering of the manager nodes. 

    Info

    We currently support only MySQL. Support for other databases will be added soon.

  • The datasource to be shared by the nodes in the manager cluster must be already defined in the <SP_HOME>/conf/manager/deployment.yaml file.
  • For MySQL to work with the Stream Processor, download the MySQL connector from  here, extract and find the  mysql-connector-java-5.*.*-bin.jar.  Drop the jar to the  <SP_HOME>/lib  directory in both manager nodes.
  • In order to retrieve the state of the Siddhi Applications deployed in the system in case of a scenario where both the nodes fail, state persistence must be enabled for all worker nodes. For detailed instructions, see Configuring Database and File System State Persistence.
  • A Zookeeper cluster and Kafka cluster should already be started and hosts and ports should be known.
    • Supported Zookeeper version - 3.4.6
    • Supported Kafka version - 2.11-0.10.0.0
  • For all manager and resource nodes to communicate with the Kafka broker the following kafka libs found in <KAFKA_HOME>/libs should be converted to OSGI and added to <SP_HOME>/libs directory:
    • kafka_2.11-0.10.0.0.jar

    • kafka-clients-0.10.0.0.jar

    • metrics-core-2.2.0.jar

    • scala-parser-combinators_2.11-1.0.4.jar
    • scala-library-2.11.8.jar

    • zkclient-0.8.jar

    • zookeeper-3.4.6.jar

  • To convert Kafka libs to OSGI,

    • Create the source folder (eg: kafka) and copy the respective Kafka libs to the created folder.

    • Create another folder (eg: kafka-osgi) as the destination folder to which the converted OSGI bundles will be added.

    • Navigate to <SP_HOME>/bin and issue the following command
      - For Linux: ./jartobundle.sh <path_to_source_folder> <path_to_destination_folder>
      - For Windows: ./jartobundle.bat <path_to_source_folder> <path_to_destination_folder>

    • If converted successfully then for each lib, following messages would be shown on the terminal

    Code Block
    - INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar
    
    • You can find the osgi converted libs in the destination folder. Copy them to the <SP_HOME>/lib directory.

...

  1. In the type field, enter the type of the cluster as distributed. 
    type: distributed            
  2. For the httpsInterface parameter, specify the host, port and the user credentials of the configuring resource node.

    Info

    The host must be the IP of the network interface through which the nodes are connected (i.e., LAN IP). If all the nodes are deployed in the same physical machine, each node must have a separate port.

    In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 
    Following is a sample deployment configuration for a resource node. Code Blockdeployment.config: type: distributed # required in both manager / resource httpsInterface: # required in both manager / resource host: 192.168.1.3 port: 9090 username: admin # username of current resource node password: admin # password of current resource node leaderRetryInterval: 10000 # only required in worker resourceManagers: # only required in worker

    e.g., host:localhost, port:9090, username:admin, password:admin

  3. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
    e.g., leaderRetryInterval: 5000 
  4. :localhost, port:9090, username:admin, password:admin

  5. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
    e.g., leaderRetryInterval: 5000 
  6. In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 

    Following is a sample deployment configuration for a resource node.

    Code Block
    deployment.config:
      type: distributed           # required in both manager / resource
      httpsInterface:              # required in both manager / resource
        host: 192.168.1.3
        port: 9090
        username: admin			  # username of current resource node
        password: admin			  # password of current resource node
      leaderRetryInterval: 10000  # only required in worker
      resourceManagers:           # only required in worker
        - host: 192.168.1.1
          port: 9543
          username: admin		  # username of manager node
          password: admin		  # password of manager node
        - host: 192.168.1.2
          port: 9543
          username: admin		  # username of manager node
          password: admin		  # password of manager node
    Tip

    If you want to configure this noded as a receiver node instead of a resource node, you need to add the following parameter below the type parameter.

    isReceiverNode : true

    This is the only difference between a resource node and a receiver node. The following is the sample configuration of a receiver node.

    Code Block
    deployment.config:
     type: distributed       
     isReceiverNode : true
     httpsInterface:          
       host: 192.168.1.3
       port: 9090
       username: admin     
       password: admin     
     leaderRetryInterval: 10000
     resourceManagers:          - host: 192.168.1.1
         
    port: 9543
         
    username: admin
          # username of manager node
    
         password: admin
          # password of manager node
       
    - host: 192.168.1.2
         
    port: 9543
         
    username: admin
          # username of manager node
    
         password: admin
          # password of manager node
    Info
    iconfalse
    titleImportant

    State persistence must be enabled for all worker nodes using a common database. For detailed instructions, see Configuring Database and File System State Persistence

Starting the cluster

To start the distributed SP cluster, follow the procedure below:

    1. Start each manager by navigating to the <SP_HOME>/bin directory and issuing the following command:
      For Windows: manager.bat
      For Linux : ./manager.sh
    2. Start each worker by navigating to the <SP_HOME>/bin directory and issuing the following command:
      For Windows: worker.bat
      For Linux : ./worker.sh
    3. When both manager and resource nodes are successfully started, the following is printed in the log entry.

      Panel

      INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - WSO2 Stream Processor started in x sec

    4. Siddhi applications should be deployed to the manager cluster using one of the following methods.

      1. Dropping the .siddhi file in to the <SP_HOME>/wso2/manager/deployment/siddhi-files/ directory before or after starting the manager node.

      2. Sending a "POST" request to  http://<host>:<port>/siddhi-apps, with the Siddhi App attached as a file in the request as shown in the example below. Refer Stream Processor REST API Guide for more information on using WSO2 Strean Processor APIs.

        Code Block
        titleSample CURL request to deploy Siddhi application
        curl -X POST "https://localhost:9543/siddhi-apps" -H "accept: application/json" -H "Content-Type: text/plain" -d @TestSiddhiApp.siddhi -u admin:admin -k
      Info
      titleImportant
      To deploy Siddhi applications in Distributed deployment it is recommended to use a content synchronization mechanism since Siddhi applications must be deployed to both manager nodes. You can use a common shared file system such as Network File System (NFS) or any other shared file system that is available. You need to mount the <SP_HOME>/wso2/manager/deployment/siddhi-files/ directory of the two nodes to the shared file system.