This documentation is for WSO2 Stream Processor 4.1.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

The most common deployment pattern for WSO2 SP is the Minimum High Availability Deployment that offers high availability with the minimum amount of resources. However, there are a few user scenarios where the HA (High Availability) deployment is not sufficient to handle the throughput. The Fully Distributed Deployment pattern is introduced to cater to such scenarios.

Distributed Siddhi applications

In a distributed Siddhi application, an execution group is a single unit of execution. For each execution group, a specified number of parallel Siddhi application instances are created. This is done via the @dist annotation.

e.g., This distributed Siddhi application contains two execution groups named group1 and group2. No specific number of parallel instances are specified for group1, and therefore, only one instance is created for it at runtime by default. Two parallel instances are specified for group2

@App:name('wso2-app')

@info(name = ‘query1') 
@dist(execGroup='group1')
from TempStream#window.time(2 min)
select avg(temp) as avgTemp, roomNo, deviceID 
insert all events into AvgTempStream;
 
@info(name = ‘query3') 
@dist(execGroup='group1')
from every( e1=TempStream ) ->
e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
 
@info(name = ‘query4') 
@dist(execGroup='group2' ,parallel ='2')
from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ] 
select roomNo, temp 
insert into HighTempStream;


The following is an illustration of how each parallel instance is created as a separate Siddhi application.



Each Siddhi application is deployed in the available resource nodes of the distributed cluster. All these Siddhi applications communicate with each other using Kafka topics. The system creates Kafka topics representing each stream and configures the Siddhi applications to use these topics as required.

For detailed information, see Converting to a Distributed Streaming Application.

Architecture


Manager cluster

The manager cluster contains two or more WSO2 SP instances configured to run in the high availability mode. The manager cluster is responsible for parsing a user-defined distributed Siddhi application, dividing it to multiple Siddhi applications, creating the required topics and then deploying them in the available resource nodes. The manager cluster also handles resource nodes that join/leave the distributed cluster, and re-schedules the Siddhi applications accordingly. Since manager nodes are deployed in a high availability mode, if and when the active manger node goes down, another node in the manger will be elected as the cluster to handle the resource cluster.

Resource cluster

A resource cluster contains multiple WSO2 SP instances. Each instance sends a periodic heartbeat to the manager cluster so that the managers at any given time can identify the resource nodes that are active in the cluster. The resource nodes are responsible for running Siddhi applications assigned to them by the manager nodes. A resource node continues to run its Siddhi applications until a manager node undeploys them, or until it is no longer able to reach a manager node to send its heartbeat. If a manager node is unreachable for a specified amount of time, the resource node stops operating, removes its deployed Siddhi applications and waits until it can reach a manager node again.

Deployed Siddhi applications communicate among themselves via Kafka topics.

Kafka cluster

A Kafka cluster holds all the topics used by distributed Siddhi applications. All communications between execution groups take place via Kafka. You can only publish and receive data from distributed Siddhi applications via Kafka. In order to do so, you can either define a Kafka source in the initial distributed Siddhi application or use the Kafka source created by distributed implementation. Note that installing Kafka and Zookeeper is a prerequisite to configure a distributed deployment.

Configuring a distributed cluster

This section explains how to configure a distributed WSO2 SP cluster.

Prerequisites

In order to configure a fully distributed HA cluster, the following prerequisites must be completed:

  • A WSO2 SP binary pack must be available for each node in the cluster.
  • Each SP node should have an ID under wso2.carbon in the <SP_HOME>/conf/manager/deployment.yaml or <SP_HOME>/conf/worker/deployment.yaml file depending on the cluster node being configured.
  • A working RDBMS instance to be used for clustering of the manager nodes. 

    We currently support only MySQL. Support for other databases will be added soon.

  • The datasource to be shared by the nodes in the manager cluster must be already defined in the <SP_HOME>/conf/manager/deployment.yaml file.
  • For MySQL to work with the Stream Processor, download the MySQL connector from here, extract and find the mysql-connector-java-5.*.*-bin.jar. Drop the jar to the <SP_HOME>/lib directory in both manager nodes.
  • In order to retrieve the state of the Siddhi Applications deployed in the system in case of a scenario where both the nodes fail, state persistence must be enabled for all worker nodes. For detailed instructions, see Configuring Database and File System State Persistence.
  • A Zookeeper cluster and Kafka cluster should already be started and hosts and ports should be known.
    • Supported Zookeeper version - 3.4.6
    • Supported Kafka version - 2.11-0.10.0.0
  • For all manager and resource nodes to communicate with the Kafka broker the following kafka libs found in <KAFKA_HOME>/libs should be converted to OSGI and added to <SP_HOME>/libs directory:
    • kafka_2.11-0.10.0.0.jar

    • kafka-clients-0.10.0.0.jar

    • metrics-core-2.2.0.jar

    • scala-parser-combinators_2.11-1.0.4.jar
    • scala-library-2.11.8.jar

    • zkclient-0.8.jar

    • zookeeper-3.4.6.jar

  • To convert Kafka libs to OSGI,

    • Create the source folder (eg: kafka) and copy the respective Kafka libs to the created folder.

    • Create another folder (eg: kafka-osgi) as the destination folder to which the converted OSGI bundles will be added.

    • Navigate to <SP_HOME>/bin and issue the following command
      - For Linux: ./jartobundle.sh <path_to_source_folder> <path_to_destination_folder>
      - For Windows: ./jartobundle.bat <path_to_source_folder> <path_to_destination_folder>

    • If converted successfully then for each lib, following messages would be shown on the terminal

    - INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar
    
    • You can find the osgi converted libs in the destination folder. Copy them to the <SP_HOME>/lib directory.


Configuring the cluster

To configure a fully distributed HA cluster, follow the procedure below:

Configure manager nodes

To configure a node as a manager node, update the <SP_HOME>/conf/manager/deployment.yaml file as follows. The fully distributed cluster can have one or more manager nodes. For more information on how to set up cluster coordination see Configuring Cluster Coordination.

  1. In the cluster.config section, make the following changes.
    1. To enable the cluster mode, set the enabled property to true.
    2. In order to cluster all the manager nodes together, enter the same cluster ID as the group ID for all the nodes (e.g., groupId: group-1).
    3. Enter the ID of the class that defines the coordination strategy for the cluster as shown in the example below.
      e.g., coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
  2. In the strategyConfig section of cluster.config, enter information for the required parameters as follows.
    1. Enter the ID of the datasource shared by the nodes in the cluster as shown in the example below. Data handled by the cluster are persisted here.
      datasource: SP_MGT_DB

      The SP_MGT_DB datasource is configured to an h2 database by default. You must create a MySQL database and then configure this datasource in the <SP_HOME>/conf/manager/deployment.yaml file of the required manager. The following is a sample configuration.

        - name: SP_MGT_DB
          description: The MySQL datasource used for Cluster Coordination
          # JNDI mapping of a data source
          jndiConfig:
             name: jdbc/WSO2ClusterDB
          # data source definition
          definition:
             # data source type
             type: RDBMS
             # data source configuration
             configuration:
               jdbcUrl: 'jdbc:mysql://<host>:<port>/<database_name>?useSSL=false'
               username: <Username_Here>
               password: '<Password_Here>'
               driverClassName: com.mysql.jdbc.Driver
               maxPoolSize: 50
               idleTimeout: 60000
               connectionTestQuery: SELECT 1
               validationTimeout: 30000
               isAutoCommit: false
    2. Specify the time interval (in milliseconds) at which heartbeat pulse should occur within the manager cluster to indicate that a manager is in an active state as shown in the example below.
      heartbeatInterval: 500
    3. Specify the number of times the heartbeat pulse must be unavailable at the specified time interval in order to consider a manager node as inactive as shown in the example below. A value of four means that if a manager node fails to send four consecutive heart beat pulses, it will be identified as unresponsive and another manager node will act as the active node in the manager cluster.
      heartbeatMaxRetry: 4
    4. Specify the time interval (in milliseconds) at which each node should listen for changes that occur in the cluster as shown in the example below.
      eventPollingInterval: 1000
  3. In the deployment.config section, enter information as follows:
    1. In the type field, enter the type of the cluster as distributed
      type: distributed    
    2. For the httpInterface parameter, specify the host and the port of the node.

      Host should be the IP of the network interface though which nodes are connected. (i.e LAN IP). Each node should have a separate port if deployed in same physical machine.

      e.g., host:localhost, port:9190

    3. Specify the time interval (in milliseconds) at which resource nodes connected to this manager should send heartbeat pulses to indicate that they are in a working state as shown in the example below.
      e.g., heartbeatInterval: 2000
    4. Specify the number of times a resource node's heartbeat should be unavailable for the manager node to identify that the resource node as unresponsive. i.e. according to the below example, if the resource node fails to send 4 consecutive heartbeat pulses it will be recognized as unresponsive and the siddhi applications deployed in that node will be rescheduled to another available resource node.
      e.g., heartbeatMaxRetry: 4

    5. In the minResourceCount parameter, specify the minimum number of resource nodes required to operate the distributed setup. Siddhi applications are not deployed when the number of available resource nodes is less than the number specified here. The default value is 1.

    6. In the bootstrapURLs parameter, specify the Kafka server URLs used by the cluster as a comma separated list. 
      e.g., It should be given in the format <host_1>:<port_1><host_2>:<port_2>

    7. In the zooKeeperURLs parameter, specify the server URL of the zookeeper of the cluster in the format given below:
      <ZOOKEEPER_HOST>:<ZOOKEEPER_PORT>

      The following is an example of a deployment.config section configured as described above.

      deployment.config:
        type: distributed
        httpInterface:
          host: 192.168.1.1
          port: 9190
        heartbeatInterval: 2000
        heartbeatMaxRetry: 2
        datasource: SP_MGT_DB    # define a mysql datasource in datasources and refer it from here.
        minResourceCount: 1
        bootstrapURLs: 192.168.1.10:9092, 192.168.1.11:9092 # only required in manager (kafka urls)
        zooKeeperURLs: 192.168.1.10:2181
Configure resource nodes

To configure the resource nodes for a fully distributed HA cluster, edit the <SP_HOME>/conf/worker/deployment.yaml file as follows. You have to uncomment (remove the # in front of each line) the section under # Sample of deployment.config for Distributed deployment. Now start performing following steps under deployment.config section.

  1. In the type field, enter the type of the cluster as distributed
    type: distributed            
  2. For the httpInterface parameter, specify the host, port and the user credentials of the configuring resource node.

    The host must be the IP of the network interface through which the nodes are connected (i.e., LAN IP). If all the nodes are deployed in the same physical machine, each node must have a separate port.

    e.g., host:localhost, port:9090, username:admin, password:admin

  3. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
    e.g., leaderRetryInterval: 5000 
  4. In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 

    Following is a sample deployment configuration for a resource node.

    deployment.config:
      type: distributed           # required in both manager / resource
      httpInterface:              # required in both manager / resource
        host: 192.168.1.3
        port: 9090
        username: admin			  # username of current resource node
        password: admin			  # password of current resource node
      leaderRetryInterval: 10000  # only required in worker
      resourceManagers:           # only required in worker
        - host: 192.168.1.1
          port: 9190
          username: admin		  # username of manager node
          password: admin		  # password of manager node
        - host: 192.168.1.2
          port: 9190
          username: admin		  # username of manager node
          password: admin		  # password of manager node

    Important

    State persistence must be enabled for all worker nodes using a common database. For detailed instructions, see Configuring Database and File System State Persistence

Starting the cluster

To start the distributed SP cluster, follow the procedure below:

  1. Start each manager by navigating to the <SP_HOME>/bin directory and issuing the following command:
    For Windows: manager.bat
    For Linux : ./manager.sh
  2. Start each worker by navigating to the <SP_HOME>/bin directory and issuing the following command:
    For Windows: worker.bat
    For Linux : ./worker.sh
  3. When both manager and resource nodes are successfully started, the following is printed in the log entry.

    INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - WSO2 Stream Processor started in x sec

  4. Siddhi applications should be deployed to the manager cluster using one of the following methods.

    1. Dropping the .siddhi file in to the <SP_HOME>/wso2/manager/deployment/siddhi-files/ directory before or after starting the manager node.

    2. Sending a "POST" request to http://<host>:<port>/siddhi-apps, with the Siddhi App attached as a file in the request as shown in the example below. Refer Stream Processor REST API Guide for more information on using WSO2 Strean Processor APIs.

      Sample CURL request to deploy Siddhi application
      curl -X POST "https://localhost:9543/siddhi-apps" -H "accept: application/json" -H "Content-Type: text/plain" -d @TestSiddhiApp.siddhi -u admin:admin -k

    Important

    To deploy Siddhi applications in Distributed deployment it is recommended to use a content synchronization mechanism since Siddhi applications must be deployed to both manager nodes. You can use a common shared file system such as Network File System (NFS) or any other shared file system that is available. You need to mount the <SP_HOME>/wso2/manager/deployment/siddhi-files/ directory of the two nodes to the shared file system.
  • No labels