This documentation is for WSO2 Stream Processor 4.3.0. View documentation for the latest release.

All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Clean up prerequisites section and move state persistance content to resource node section

...

Deployed Siddhi applications communicate among themselves via messaging layer.

...

Messaging Cluster

Info

It is required to insrtall Kafka and Zookeeper install either Kafka or NATS broker as the messaging layer to configure a fully distributed deployment.

A Kafka Messaging cluster holds all the topics used by distributed Siddhi applications . All communications between execution groups take place via Kafka.to communicate in-between them. Publishing and receiving data from distributed Siddhi applications can also be done via Kafka or other Siddhi sources as follows:

  • Via Kafka
    To use Kafka for publishing and receiving data, you can either define a Kafka source in the initial distributed Siddhi application or use the Kafka source created by the distributed implementation. 
  • Via Other Siddhi Sources
    This invoves definingh the source in the initial distributed Siddhi application.

    Info

    At least one receiver worker node should be configured in the cluster.

Configuring a distributed cluster

This section explains how to configure a distributed WSO2 same messaging cluster. To use the messaging layer as the event entry point users will have to define sources based on the type of messaging layer. Then generated partial Siddhi apps will consume from those predefined topics. Topics to send data across generated partial apps will be created automatically.

Configuring a distributed cluster

This section explains how to configure a distributed WSO2 SP cluster.

Prerequisites

In order to configure a fully distributed HA cluster, the following prerequisites must be completed:

  • A WSO2 SP binary pack must be available for each node in the cluster.
  • Each SP node should have an ID under wso2.carbon in the <SP_HOME>/conf/manager/deployment.yaml or <SP_HOME>/conf/worker/deployment.yaml  file depending on the cluster node being configured.
  • A working RDBMS instance to be used for clustering of the manager nodes.  

    Info

    We currently support only MySQL. Support for other databases will be added soon.

  • The datasource to be shared by the nodes in the manager cluster must be already defined in the <SP_HOME>/conf/manager/deployment.yaml file.
  • For MySQL to work with the Stream Processor, download the MySQL connector from  here, extract and find the  mysql-connector-java-5.*.*-bin.jar.  Drop the jar to the  <SP_HOME>/lib  directory in both manager nodes.
  • In order to retrieve the state of the Siddhi Applications deployed in the system in case of a scenario where both the nodes fail, state persistence must be enabled for all worker nodes. For detailed instructions, see Configuring Database and File System State Persistence.
  • A Zookeeper cluster and Kafka cluster should already be started and hosts and ports should be known.
    • Supported Zookeeper version - 3.4.6
    • Supported Kafka version - 2.11-0.10.0.0
  • For all manager and resource nodes to communicate with the Kafka broker the following kafka libs found in <KAFKA_HOME>/libs should be converted to OSGI and added to <SP_HOME>/libs directory:
    • kafka_2.11-0.10.0.0.jar

    • kafka-clients-0.10.0.0.jar

    • metrics-core-2.2.0.jar

    • scala-parser-combinators_2.11-1.0.4.jar
    • scala-library-2.11.8.jar

    • zkclient-0.8.jar

    • zookeeper-3.4.6.jar

  • To convert Kafka libs Currently H2, MySQL, Oracle, Postgre and MSSQL databases are supported.

  • Add the database driver corresponding to the used DB system to <SP_HOME>/lib directory. Please note that if the driver jar is not an OSGi buundle then u have to convert the jar to bundle using jartobundle.sh included in <SP_HOME>/bin. Refer Configuring Datasources documentation for more information.
  • A Zookeeper cluster and Kafka cluster should already be started and hosts and ports should be known.
    • Supported Zookeeper version - 3.4.6
    • Supported Kafka version - 2.11-0.10.0.0
  • For all manager and resource nodes to communicate with the Kafka broker the following kafka libs found in <KAFKA_HOME>/libs should be converted to OSGI and added to <SP_HOME>/libs directory:
    • kafka_2.11-0.10.0.0.jar

    • kafka-clients-0.10.0.0.jar

    • metrics-core-2.2.0.jar

    • scala-parser-combinators_2.11-1.0.4.jar
    • scala-library-2.11.8.jar

    • zkclient-0.8.jar

    • zookeeper-3.4.6.jar

  • To convert Kafka libs to OSGI,

    • Create the source folder (eg: kafka) and copy the respective Kafka libs to the created folder.

    • Create another folder (eg: kafka-osgi) as the destination folder to which the converted OSGI bundles will be added.

    • Navigate to <SP_HOME>/bin and issue the following command
      - For Linux: ./jartobundle.sh <path_to_source_folder> <path_to_destination_folder>
      - For Windows: ./jartobundle.bat <path_to_source_folder> <path_to_destination_folder>

    • If converted successfully then for each lib, following messages would be shown on the terminal

    Code Block
    - INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar
    
    • You can find the osgi converted libs in the destination folder. Copy them to the <SP_HOME>/lib directory.

...

To configure the resource nodes for a fully distributed HA cluster, edit the <SP_HOME>/conf/worker/deployment.yaml file as follows. You have to uncomment (remove the # in front of each line) the section under # Sample of deployment.config for Distributed deployment. Now start performing following steps under deployment.config section.

  1. In the type field, enter the type of the cluster as distributed.
    type: distributed     Uncomment deployment.config section of the deployment.yaml and perform below changes to configure resource nodes to communicate with manager node.  
    1. In the type field, enter the type of the cluster as distributed.
      type: distributed 
    2. For the httpsInterface parameter, specify the host, port and the user credentials of the configuring resource node.

      Info

      The host must be the IP of the network interface through which the nodes are connected (i.e., LAN IP). If all the nodes are deployed in the same physical

    machine, each node must have a separate port.

    e.g., host:localhost, port:9090, username:admin, password:admin

  2. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
    e.g., leaderRetryInterval: 5000 
  3. In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 
    Following is a sample deployment configuration for a resource
    1. machine, each node must have a separate port.

      e.g., host:localhost, port:9090, username:admin, password:admin

    2. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
      e.g., leaderRetryInterval: 5000 
    3. In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 

      Following is a sample deployment configuration for a resource node.

      Code Block
      deployment.config:
        type: distributed           # required in both manager / resource
        httpsInterface:              # required in both manager / resource
          host: 192.168.1.3
          port: 9090
          username: admin			  # username of current resource node
          password: admin			  # password of current resource node
        leaderRetryInterval: 10000  # only required in worker
        resourceManagers:           # only required in worker
          - host: 192.168.1.1
            port: 9543
            username: admin		  # username of manager node
            password: admin		  # password of manager node
          - host: 192.168.1.2
            port: 9543
            username: admin		  # username of manager node
            password: admin		  # password of manager node
      Tip

      If you want to configure this noded as a receiver node instead of a resource node, you need to add the following parameter below the type parameter.

      isReceiverNode : true

      This is the only difference between a resource node and a receiver node. The following is the sample configuration of a receiver node.

      Code Block
      deployment.config:
    1. 
       type: distributed       
       isReceiverNode : true
       httpsInterface:          
         
    # required in both manager / resource
    1. host: 192.168.1.3
         port: 9090
         
    httpsInterface
    1. username: admin     
         password: admin     
       leaderRetryInterval: 10000
       
    #
    1. resourceManagers: 
    required
    1.  
    in
    1.  
    both
    1.  
    manager
    1.  
    /
    1.  
    resource
    1.     - host: 192.168.1.
    3
    1. 1
           port: 
    9090
    1. 9543
           username: admin
    # username of
    1.  
    current
    1.  
    resource
    1.  
    node password:
    1.  
    admin
    1.   # 
    password
    1. username of 
    current resource
    1. manager node
        
    leaderRetryInterval:
    1.  
    10000
    1.   
    # only required in worker resourceManagers
    1. password: admin      
    1. # 
    only
    1. password 
    required
    1. of 
    in
    1. manager 
    worker
    1. node
         
    1. - host: 192.168.1.
    1
    1. 2
           port: 9543
    1. 
           username: admin
    1.       # username of manager node
           password: admin     
    password: admin # password of manager node - host: 192.168.1.2
    1.  # password of manager node
  4. In order to retrieve the state of the Siddhi Applications deployed in the system in case of node failure, state persistence must be enabled for all worker nodes using DB state persistance sharing common database. To do that first you will have to define a new datasource to be used by persistance information under datasources section of deployment.yaml. Below is a sample such datasource configuration based on a MySQL DB.


    deployment.config: type: distributed
        type: RDBMS
            configuration:
              jdbcUrl: jdbc:mysql://localhost:3306/WSO2_PERSISTENCE_DB?useSSL=false
             
    isReceiverNode
     username: 
    true
    root
     
    httpsInterface:
             password: 
    root
      
    host:
     
    192.168.1.3
        
    port:
     
    9090
      driverClassName: 
    username: admin
    com.mysql.jdbc.Driver
              
    password
    maxPoolSize: 50
    
    admin
           
    leaderRetryInterval:
     
    10000
      
    resourceManagers
    idleTimeout: 60000
             
    -
     
    host
    connectionTestQuery: 
    192.168.1.
    SELECT 1
         
    port:
     
    9543
        validationTimeout: 30000
    
    username:
     
    admin
          
    #
     
    username
     
    of
     
    manager node password: admin # password of manager node - host: 192.168.1.2 port: 9543 username: admin # username of manager node password: admin # password of manager node
    Code Block
     - name: WSO2_PERSISTENCE_DB
          port: 9543description: The datasource used for test database
          usernamejndiConfig:
       admin		  # username of manager nodename: jdbc/WSO2_PERSISTENCE_DB
          passworddefinition:
    admin		  # password of manager node
    Tip

    If you want to configure this noded as a receiver node instead of a resource node, you need to add the following parameter below the type parameter.

    isReceiverNode : true

    This is the only difference between a resource node and a receiver node. The following is the sample configuration of a receiver node.

    Code Block
    Info
    iconfalse
    titleImportant
    State persistence must be enabled for all worker nodes using a common database. For detailed instructions, see Configuring Database and File System State Persistence
    isAutoCommit: false

    After this user will need to enable state persistance pointing to this Database using persistance store as 'org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore'. Sample config is shown below. For detailed instructions, see Configuring Database and File System State Persistence

    Code Block
    state.persistence:
      enabled: true
      intervalInMin: 3
      revisionsToKeep: 2
      persistenceStore: org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore
      config:
        datasource: WSO2_PERSISTENCE_DB
        table: PERSISTENCE_TABLE

Starting the cluster

To start the distributed SP cluster, follow the procedure below:

...