This documentation is for WSO2 Stream Processor 4.3.0. View documentation for the latest release.

All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each Siddhi application is deployed in the available resource nodes of the distributed cluster. All these Siddhi applications communicate with each other using via the messaging layer. The system has the ability to interact with the messaging layer and create topics representing each stream and , and  it configures the Siddhi applications to use these topics as required.

...

Job Manager nodes handle all the Management layer related functionalities. This layer contains two WSO2 SP Manager instances configured to run in high availability mode. Here, the Manager parses the distributed Siddhi application provided by the user, partitions it into multiple Siddhi applications, wires them using messaging layer topics, and deploys them in the available worker nodes. Management layer also handles the effects of the worker nodes joining/leaving the distributed cluster by re-distributing the Siddhi applications accordingly.


The processing layer (also known as the resource cluster) is represented by multiple WSO2 SP Worker instances that are configured as workers. Each WSO2 SP worker instance in this layer registers itself to the Manager Cluster when it starts. These workers periodically send their heartbeats to the Manager Cluster. This allows the Managers to identify the active worker nodes and the inactive ones. The worker nodes (resource nodes) run the Siddhi applications assigned to them by their Manager nodes. In addition, they are also capable of handling network partitions in a graceful manner as depicted in the following diagram. 


As depicted above, a worker node periodically synchronizes its configurations and the Siddhi applications with the manager Node. If the network gets partitioned or if the manager becomes unreachable, it undeploys the applications deployed in it. By doing so, it allows the Siddhi applications to be rescheduled in other work nodes that are maintaining their connections with the manager nodes.

It is required to use Apache Kafka or NATS (which only supported in the WUM updated product for SP 4.3.0) as the messaging layer to configure a fully distributed SP cluster.  Persistence stores of the Persistence layer can be RDBMS databases that store both configuration and system state data. Identity and access management of all the WSO2 Stream Processor nodes are can be handled by any SCIM supported Identity provider such as of the WSO2 Identity and Access Management(WSO2 IAM).

There are no restrictions to run WSO2 Stream Processor in the distributed mode on any environment. It can run in the distributed mode on bare metal, VMs, and containers. Here the manager nodes are grouped in a single cluster backed by a database for correlation. Similarly, dashboard nodes can also be deployed in a separate cluster. The worker nodes, on the other hand, are not aware of each other. They are synchronized with manager nodes from which they receive instructions. 

...

Messaging cluster holds all the topics used by distributed Siddhi applications to communicate in-between themfor communications between the nodes in it. Publishing and receiving data from distributed Siddhi applications can also be done via the same messaging cluster. To use the messaging layer as the event entry point users will have , you need to define sources based on the type of the messaging layer. Then the generated partial Siddhi apps will applications consume from those predefined topics. Topics to send data across the generated partial apps will be created automaticallyapplications are automatically created.

Configuring a distributed cluster

...

  • A WSO2 SP binary pack must be available for each node in the cluster.
  • Each SP node should must have an a distinct ID under wso2.carbon in the <SP_HOME>/conf/manager/deployment.yaml or <SP_HOME>/conf/worker/deployment.yaml file depending on the cluster node being configured.
  • A working RDBMS instance to be used for clustering of the manager nodes. Currently  H2, MySQL, Oracle, Postgre and MSSQL databases are supported.

  • Add the database driver corresponding

    to

    with the used DB system to the <SP_HOME>/lib directory.

    Please note that if

    Info

    If the driver jar is not an OSGi

    buundle then u have

    bundle, you need to convert the

    jar to

    JAR into a bundle using

    jartobundle.sh included in

    the <SP_HOME>/bin

    . Refer

    /jartobundle.sh script. For more information, see Configuring Datasources

    documentation for more information

    .

  • Messaging The messaging cluster based in on Kafka or NATS should must be started, and the host and ports of the cluster should must be known. Users will You also need a ZooKeeper cluster to facilitate the Kafka cluster. Following The following versions of each product is supported.
    • Zookeeper version - : 3.4.6
    • Kafka version - : 2.11-0.10.0.0
    • NATS streaming server version - : 0.11.x
  • Following The following tasks needed need to be carry carried out depending on the messaging layer in orer order to make WSO2 SP compatible with the messaging layer.
Tabs Container
directionhorizontal
Tabs Page
titleNats dependencies

For To enable all the manager and resource nodes to communicate with the NATS streaming server, follow the steps below instructions. :

  1. Download following dependencies, convert
to
  1. them into OSGi bundles (explained below)
and copy to
  1. , and place them in the <SP_HOME>/lib directory.
2.3.0 https://mvnrepository.com/artifact/io.nats/jnats/
2.1.2 https://mvnrepository.com/artifact/io.nats/java-nats-streaming/
  1. Download
and copy below jars directly to
  1. the following JARS and place them (without converting) in the <SP_HOME>/lib directory.
4 https://mvnrepository.com/artifact/org.wso2.extension.siddhi.io.nats/siddhi-io-nats/1.0.4Protocol buffers 3.6.1 https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/
Tabs Page
titleKafka dependencies

For all manager and resource nodes to communicate with Kafka follow below instructions.

Find following dependencies in <KAFKA_HOME>/libs, convert to OSGi bundles (explained below) and copy to <SP_HOME>/lib directory.

  • kafka_2.11-0.10.0.0.jar

  • kafka-clients-0.10.0.0.jar

  • metrics-core-2.2.0.jar

  • scala-parser-combinators_2.11-1.0.4.jar

  • scala-library-2.11.8.jar

  • zkclient-0.8.jar

  • Zookeeper-3.4.6.jar

Converting JARs to OSGi bundle

To convert jars to OSGi bundles,

...

follow the steps below:

  1. Create the source directory (e.g., named jars) and copy the

...

  1. required JARs into the created

...

  1. directory.

...

  1. Create another

...

  1. directory (e.g., named osgi). This is the destination directory to which the converted

...

  1. OSGi bundles

...

  1. are to be added.

...

  1. To convert the JARs, navigate to the <SP_HOME>/bin directory and issue one of the following

...

  1. commands.

...

    • For Linux: ./jartobundle.sh <path_to_source_

...

    • directory> <path_to_destination_

...

    • directory>

...

    • For Windows: ./jartobundle.

...

    • bat <path_to_source_

...

    • directory> <path_to_destination_

...

    • directory>

    If

...

  1. the JARs are successfully converted, the following message appears in the terminal.

    Panel

    - INFO: Created the OSGi bundle <jar-name>.

...

  1. jar for

...

  1.   JAR

...

  1. file

...

  1. <absolute_path>/jars/<jar-name>.jar

...

...

  1. The converted OSGi bundles are now available in the destination

...

  1. directory. Copy them

...

  1. and place them in the <SP_HOME>/lib directory.

Configuring the cluster

To configure a fully distributed HA cluster, follow the procedure below:

...

  1. In the cluster.config section, make the following changes.
    1. To enable the cluster mode, set the enabled property to true.
    2. In order to cluster all the manager nodes together, enter the same cluster ID as the group ID for all the nodes (e.g., groupId: group-1).
    3. Enter the ID of the class that defines the coordination strategy for the cluster as shown in the example below.
      e.g., coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
  2. In the strategyConfig section of cluster.config, enter information for the required parameters as follows.
    1. Enter the ID of the datasource shared by the nodes in the cluster as shown in the example below. Data handled by the cluster are persisted here.
      datasource: SP_MGT_DB

      Info

      The SP_MGT_DB datasource is configured to an h2 database by default. You must create a MySQL database and then configure this datasource in the <SP_HOME>/conf/manager/deployment.yaml file of the required manager. The following is a sample configuration.

      Code Block
        - name: SP_MGT_DB
          description: The MySQL datasource used for Cluster Coordination
          # JNDI mapping of a data source
          jndiConfig:
             name: jdbc/WSO2ClusterDB
          # data source definition
          definition:
             # data source type
             type: RDBMS
             # data source configuration
             configuration:
               jdbcUrl: 'jdbc:mysql://<host>:<port>/<database_name>?useSSL=false'
               username: <Username_Here>
               password: '<Password_Here>'
               driverClassName: com.mysql.jdbc.Driver
               maxPoolSize: 50
               idleTimeout: 60000
               connectionTestQuery: SELECT 1
               validationTimeout: 30000
               isAutoCommit: false
    2. Specify the time interval (in milliseconds) at which heartbeat pulse should occur within the manager cluster to indicate that a manager is in an active state as shown in the example below.
      heartbeatInterval: 500
    3. Specify the number of times the heartbeat pulse must be unavailable at the specified time interval in order to consider a manager node as inactive as shown in the example below. A value of four means that if a manager node fails to send four consecutive heart beat pulses, it will be identified as unresponsive and another manager node will act as the active node in the manager cluster.
      heartbeatMaxRetry: 4
    4. Specify the time interval (in milliseconds) at which each node should listen for changes that occur in the cluster as shown in the example below.
      eventPollingInterval: 1000
  3. In the deployment.config section, enter information as follows:
    1. In the type field, enter the type of the cluster as distributed
      type: distributed    
    2. For the httpsInterface parameter, specify the host and the port of the node.

      Info

      Host should be the IP of the network interface though which nodes are connected. (i.e LAN IP). Each node should have a separate port if deployed in same physical machine.

      e.g., host:localhost, port:9543

    3. Specify the time interval (in milliseconds) at which resource nodes connected to this manager should send heartbeat pulses to indicate that they are in a working state as shown in the example below.
      e.g., heartbeatInterval: 2000
    4. Specify the number of times a resource node's heartbeat should be unavailable for the manager node to identify that the resource node as unresponsive. i.e. according to the below example, if the resource node fails to send 4 consecutive heartbeat pulses it will be recognized as unresponsive and the siddhi applications deployed in that node will be rescheduled to another available resource node.
      e.g., heartbeatMaxRetry: 4

    5. In the minResourceCount parameter, specify the minimum number of resource nodes required to operate the distributed setup. Siddhi applications are not deployed when the number of available resource nodes is less than the number specified here. The default value is 1.

    6. If you are using NATS as messaging layer, specify the NATS server URLs used by the cluster with natsServerUrl. You can enter multiple URLs in the natsServerUrl parameter as a comma-separated list. list 
      (e.g., It should be given in the format <host_1>:<port_1>, <host_2>:<port_2>.  Also you will ).  You also need to provide cluster the ID name of the cluster created in the NATS server using via the clusterid property parameter.
    7. If you are using kafka Kafka as messaging layer, specify the Kafka server URLs used by the cluster with via the bootstrapURLs parameter as a comma-separated list .
      ( e.g., It should be given in the format <host_1>:<port_1>, <host_2>:<port_2>).

    8. In For the zooKeeperURLs parameter under zookeeper config, specify the server URL of the zookeeper of the cluster in the format given below:.
      <ZOOKEEPER_HOST>:<ZOOKEEPER_PORT>

      The following is an example of a deployment.config section configured as described above.

      Tabs Container
      directionhorizontal
      Tabs Page
      titleNATS Config
      Code Block
      # Deployment Configuration for Distributed Deployment
      deployment.config:
        type: distributed
        httpsInterface:
          host: localhost
          port: 9543
        heartbeatInterval: 2000
        heartbeatMaxRetry: 4
        datasource: SP_MGT_DB           # define a mysql datasource in datasources and refer it from here.
        minResourceCount: 1
        natsServerUrl: nats://localhost:4222
        clusterId: test-clustercluster
        appCreatorClass: org.wso2.carbon.sp.jobmanager.core.appcreator.NatsSiddhiAppCreator
      Tabs Page
      titleKafka Config
      Code Block
      # Deployment Configuration for Distributed Deployment
      deployment.config:
        type: distributed
        httpsInterface:
          host: localhost
          port: 9543
        heartbeatInterval: 2000
        heartbeatMaxRetry: 4
        datasource: SP_MGT_DB           # define a mysql datasource in datasources and refer it from here.
        minResourceCount: 1
        bootstrapURLs: localhost:9092   # kafka urls
        zooKeeperConfig:
          zooKeeperURLs: localhost:2181   # zookeeper urls
          connectionTimeout: 10000
          sessionTimeout: 10000

...

To configure the resource nodes for a fully distributed HA cluster, edit the <SPthe <SP_HOME>/conf/worker/deployment.yaml file yaml file as follows. You have to uncomment (remove the # in front of each line) the section under # Sample of deployment.config for Distributed deployment. Now start performing following steps under deployment.config section.

  1. Uncomment deploymentUncomment deployment.config section of the deployment.yaml and perform below changes to configure file. Then configure the resource nodes to communicate with the manager node by following the steps below.  
    1. In the type field, enter the type of the cluster as distributed.
      type: distributed 
    2. For the httpsInterface parameter, specify the host, port and the user credentials of the configuring resource node.

      Info

      The host must be the IP of the network interface through which the nodes are connected (i.e., LAN IP). If all the nodes are deployed in the same physical machine, each node must have a separate port.

      e.g., host:localhost, port:9090, username:admin, password:admin

    3. In the leaderRetryInterval parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
      e.g., leaderRetryInterval: 5000 
    4. In the resourceManagers parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified. 

      Following is a sample deployment configuration for a resource node.

      Code Block
      deployment.config:
        type: distributed           # required in both manager / resource
        httpsInterface:              # required in both manager / resource
          host: 192.168.1.3
          port: 9090
          username: admin			  # username of current resource node
          password: admin			  # password of current resource node
        leaderRetryInterval: 10000  # only required in worker
        resourceManagers:           # only required in worker
          - host: 192.168.1.1
            port: 9543
            username: admin		  # username of manager node
            password: admin		  # password of manager node
          - host: 192.168.1.2
            port: 9543
            username: admin		  # username of manager node
            password: admin		  # password of manager node
      Tip

      If you want to configure this noded as a receiver node instead of a resource node, you need to add the following parameter below the type parameter.

      isReceiverNode : true

      This is the only difference between a resource node and a receiver node. The following is the sample configuration of a receiver node.

      Code Block
      deployment.config:
       type: distributed       
       isReceiverNode : true
       httpsInterface:          
         host: 192.168.1.3
         port: 9090
         username: admin     
         password: admin     
       leaderRetryInterval: 10000
       resourceManagers:          - host: 192.168.1.1
           port: 9543
           username: admin      # username of manager node
           password: admin      # password of manager node
         - host: 192.168.1.2
           port: 9543
           username: admin      # username of manager node
           password: admin      # password of manager node
  2. In order to retrieve the state of the Siddhi Applications deployed in the system in case the event of a node failure, state persistence must be enabled for all worker nodes using DB . This is done via database state persistance, sharing a common database. To do that first you will have need to define a new datasource to be used by persistance information under the datasources section of the deployment.yaml file. Below The following is a sample of such datasource configuration based on a MySQL DBdatabase.

    Code Block
     - name: WSO2_PERSISTENCE_DB
          description: The datasource used for test database
          jndiConfig:
            name: jdbc/WSO2_PERSISTENCE_DB
          definition:
            type: RDBMS
            configuration:
              jdbcUrl: jdbc:mysql://localhost:3306/WSO2_PERSISTENCE_DB?useSSL=false
              username: root
              password: root
              driverClassName: com.mysql.jdbc.Driver
              maxPoolSize: 50
              idleTimeout: 60000
              connectionTestQuery: SELECT 1
              validationTimeout: 30000
              isAutoCommit: false

    After this user will , you need to enable state persistance pointing to this Database using persistance store as 'database via the org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore'. Sample config is shown below persistence store. The following is a sample configuration. For detailed instructions, see Configuring Database and File System State Persistence

    Code Block
    state.persistence:
      enabled: true
      intervalInMin: 3
      revisionsToKeep: 2
      persistenceStore: org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore
      config:
        datasource: WSO2_PERSISTENCE_DB
        table: PERSISTENCE_TABLE

...