This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

When streaming events with WSO2 Stream Processor (SP), there is a possibility of the node running the SP instance failing due to several unforeseeable reasons. This leads to a loss in the events being streamed until the node can be restarted. A solution for this is to use the WSO2 SP in a High Availability (HA) environment where the processing of events is not halted at an unexpected failover scenario. The recommended HA deployment for WSO2 SP is the two node minimum HA deployment, where two instances of WSO2 SP are running in parallel as depicted in the diagram below.

In this minimum HA setup, one node is assigned as the active node while the other node is assigned as the passive node. Both nodes will process the incoming events but only the active node will be used to publish the outgoing events. In a failover scenario of the active node, the passive node will activate and start publishing events from where the active node left off. Restarting of the terminated node will make it operate in the passive state by syncing with the active node to maintain the HA status. 

In order for the 2 node HA to work, both active and passive node must receive the same (duplicate) events. To achieve this you can follow one of the strategies listed below:

  • In each node, you deploy a Siddhi application with a distributed sink only to deliver duplicated events, and another Siddhi application to process and publish events.
  • If WSO2 products are used, WSO2 data bridge can publish events to both nodes
  • If you use a message broker such as Kafka, both nodes can subscribe to the same topic to receive duplicate events.

Prerequisites

In order to configure a minimum HA cluster, the following prerequisites must be completed:

  • It is recommended to run this setup with two CPUs. Each CPU should have four cores, and 4GB memory.
  • Two binary packs of WSO2 SP must be available.
  • A working RDBMS instance to be used for clustering of the 2 nodes. 

  • The datasource to be shared by the nodes in the cluster must be already defined in the <SP_HOME>/conf/worker/deployment.yaml file.
  • Download the MySQL connector from here. Extract and find the mysql-connector-java-5.*.*-bin.jar. Drop the jar to the <SP_HOME>/lib directory in both nodes. 
  • In order to retrieve the state of the Siddhi Applications deployed in the system in case of a scenario where both the nodes fail, state persistence must be enabled for both nodes by specifying the same datasource/file location. For detailed instructions, see Configuring Database and File System State Persistence.
  • A Siddhi client must be available to publish events to both the nodes in a synchronized manner where publishing of events is not stopped when one receiver node goes down.


Configuring a minimum HA cluster

To configure a minimum HA cluster, follow the steps below:

  • Note that the following configurations need to be done in the <SP_HOME>/conf/worker/deployment.yaml file for both the WSO2 SP nodes in the cluster.
  • If you need to run both SP instances in the same host, make sure that you do a port offset to change the default ports in one of the hosts. For more information about the default ports, see Configuring Default Ports.
  1. For each node, enter a unique ID for the id property under the wso2.carbon section. (e.g., id: wso2-sp). This is used to identify each node within a cluster.
  2. To allow the two nodes in the cluster to coordinate effectively, configure carbon coordination by updating the cluster.config section of the <SP_HOME>/conf/worker/deployment.yaml as follows:

    For more information on how to set up cluster coordination refer Configuring Cluster Coordination


    1. To enable the cluster mode, set the enabled property to true.
    2. In order to cluster the two nodes together, enter the same ID as the group ID for both nodes (e.g., groupId: group-1).
    3. Enter the ID of the class that defines the coordination strategy for the cluster as shown in the example below.
      e.g., coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
    4. In the strategyConfig section, enter information as follows:
      1. For clustering of the two nodes to take place
      2. Enter the name of the configured datasource shared by the nodes in the cluster as shown in the example below. Data handled by the cluster are persisted here.
        datasource: WSO2_CLUSTER_DB (A datasource with this name should be configured)

        Following is a sample datasource configuration for a MySQL datasource that should appear under the dataSources section of the wso2.datasources section in the  <SP_HOME>/conf/worker/deployment.yaml .  For detailed instructions of how to configure a datasource, see Configuring Datasources.

        Sample MySQL datasource
        - name: WSO2_CLUSTER_DB
          description: The MySQL datasource used for Cluster Coordination
          jndiConfig:
            name: jdbc/WSO2ClusterDB
          definition:
            type: RDBMS
            configuration:
              jdbcUrl: 'jdbc:mysql://localhost:3306/WSO2_CLUSTER_DB?useSSL=false'
              username: root
              password: root
              driverClassName: com.mysql.jdbc.Driver
              maxPoolSize: 50
              idleTimeout: 60000
              connectionTestQuery: SELECT 1
              validationTimeout: 30000
              isAutoCommit: false
      3. Specify the time interval (in milliseconds) at which  heartbeat pulse should occur for each node to indicate that it is in an active state as shown in the example below.
        heartbeatInterval: 1000
      4. Specify the number of times the heartbeat pulse should be unavailable at the specified time interval in order to consider a node inactive as shown in the example below. A value of two means that if a node fails to send two consecutive heart beat pulses, it must be identified as inactive and removed from the cluster as a result.
        heartbeatMaxRetry: 2
      5. Specify the time interval (in milliseconds) at which each node should listen for changes that occur in the cluster as shown in the example below.
        eventPollingInterval: 1000
  3. Next add the deployment.config section to the <SP_HOME>/conf/worker/deployment.yaml file with following configurations:
    1. To enable 2 node minimum HA, set the type property to ha as shown below.
      type: ha
    2. Add a section named liveSync and enter the following information under it:
      1. Set the enabled parameter to true to enable direct communication between the two nodes via REST API calls. This ensures that the communication between the nodes is instantanious. 
      2. To specify the address to which the HTTP requests by the external client must be directed, enter the relevant host and port in the advertisedHost and advertisedPort parameters respectively as shown in the example below. 
        advertisedHost: localhost
        advertisedPort: 9090

        • These are optional parameters that you need to configure only in scenarios where the host and/or port is different from the advertised host and/or port (e.g., container based scenarios).

        • Listener ports are configured under the listenerConfigurations section of the wso2.transport.http namespace in the <SP_HOME>/conf/worker/deployment.yaml file. Note that in a Docker environment the host and port that is mapped to these listener configurations should be given in the liveSync section.

      3. To enable direct communication between the nodes the user credientials of the other node should be given as follows:
        username: <username> # if this is node 1's configs enter node 2's username
        password: <password> # if this is node 1's configs enter node 2's password


        The default username set for a node is admin with password admin

    3. Specify the time interval (in milliseconds) at which the passive node must call the active node to update the last published events as shown in the example below.
      outputSyncInterval: 60000

      This ensures that events are not lost if a failover takes place and the currently passive node needs to start publishing events. If liveSync is enabled, these calls are made via REST API. If liveSync is disabled the database is used to communicate the information. When the time interval specified is shorter, the passive node is updated more frequently, and as a result, the number of events queued as well as the number of duplicates published are reduced.

    4. Specify the time interval (in milliseconds) within which the passive node should synchronize its state with the active node as shown in the example given below. 

      stateSyncGracePeriod: 120000

      If liveSync is disabled, the state is synchronized using the persisted state. For this purpose, a value must be specified for the intervalInMin parameter in the state.persistence section to enable periodic state persistence.

    5. Specify the maximum number of events that can be queued in the sinks of the passive node as shown in the example below.
      sinkQueueCapacity: 20000

      When events are queued in the sinks of the passive node, it ensures that those events are published at least once.

    6. Specify the maximum number of events that can be queued in the sources of the passive node as shown in the example below. 
      sourceQueueCapacity: 20000
      The purpose of queueing events at sources is to ensure no events are dropped during state synchronization. 

    7. Specify the time (in milliseconds) given to the passive node to retry a failed state synchronization for each Siddhi application being deployed as shown in the example below. 
      retryAppSyncPeriod: 60000

      An application is not deployed unless the active node provides a valid state for that application.

      deployment.config:
        type: ha
        liveSync:
          enabled: true
          advertisedHost: localhost
          advertisedPort: 9090
          username: admin
          password: admin
        outputSyncInterval: 60000
        stateSyncGracePeriod: 120000
        sinkQueueCapacity: 20000
        sourceQueueCapacity: 20000
        retryAppSyncPeriod: 60000

Publishing events to the cluster

For your SP minimum HA deployment to function, both active and passive node must receive the same (duplicate) events. To achieve this you can follow one of the strategies listed below:

  • In each node, you deploy a Siddhi application with a distributed sink only to deliver duplicated events, and another Siddhi application to process and publish events.
  • If WSO2 products are used, WSO2 data bridge can publish events to both nodes
  • If you use a message broker such as Kafka, both nodes can subscribe to the same topic to receive duplicate events.

Publishing to both nodes via a distributed sink

In a minimum HA cluster with two nodes that uses sources such as HTTP, the nodes an be set up with distributed sinks as shown below.

In the above setup, the following Siddhi applications are deployed in each node:

  • Distributor.siddhi:
    The single source to which the distributor listens can be an HTTP source, a JMS queue etc. that only sends one event to one consumer. In such sources, an event is removed from the source once it is sent to one consumer. As a result, another consumer cannot receive the same data. To address this, a distributor Siddhi application functions as the single consumer to receive a specific event and then resend it to multiple other sources.

    A distributor Siddhi application contains a distributed sink configuration. Events are resent to multiple other sources via this sink. These sinks are configured with the @distribution annotation.

     

  • Executor.siddhi:   The purpose of this Siddhi application is to publish processed events.

As illustrated in the above diagram, the distributor Siddhi as distributes events to two other sources. However, because passive nodes do not publish events, the distributor in the passive node directs the events it consumes to the buffer. This results in the events received by the distributor of the passive node being lost. To avoid this you can enable force publishing for the distributed sink by setting the forcePublish property to true.

The following is a sample distributed sink configuration with force publishing enabled.

The forcePublish parameter can be used only in a distributed sink configured in a minimum HA cluster.


@sink(type='tcp',
      sync='true',
      @map(type='json'),
      @distribution(
	forcePublish= 'true',
        strategy='broadcast',
        @destination(url='tcp://0.0.0.0:5511/sourceStreamOne'),
        @destination(url='tcp://0.0.0.0:5512/sourceStreamOne')
      ))
define stream DistributionStream (name string, amount double);

When force publishing is enabled for the distributor Siddhi application of the passive node, events received by each distributor Siddhi appliction are resent to the two other sources. This ensures that each node receives all the events sent to the cluster.

Starting the cluster

  1. Save the required Siddhi applications in the  <SP_HOME>/deployment/siddhi-files directory in both nodes. In order to ensure that the Siddhi applications are completely synchronized between the active and the passive node, they must be added to the siddhi-files  directory before the server startup. However, the synchronization can take place effectively even if the Siddhi applications are added while the server is running.

    In deploying Siddhi applications in a two node minimum HA cluster, it is recommended to use a content synchronization mechanism since Siddhi applications must be deployed to both worker nodes. You can use a common shared file system such as Network File System (NFS) or any other shared file system that is available. You need to mount the <SP_HOME>/deployment/siddhi-files directory of the two nodes to the shared file system.
  2. Start both servers by navigating to  <SP_HOME>/bin and issuing the following command:
    For Windows: worker.bat 
    For Linux : ./worker.sh

    To start two WSO2 SP Nodes in the same machine, the listenerConfigurations under the wso2.transport.http namespace in the <SP_HOME>/conf/worker/deployment.yaml file must be updated to listen to different ports and corresponding values changed in the liveSync section. The offset property under the ports section of the wso2.carbon section found in <SP_HOME>/conf/worker/deployment.yaml should also be changed in one SP instance to avoid conflicts when starting both servers.


    If the cluster is correctly configured, the following CLI logs can be viewed without any error logs:

    In the active node:

    [2017-10-23 08:56:57,380]  INFO {org.wso2.carbon.stream.processor.core.internal.ServiceComponent} - WSO2 Stream Processor Starting in Two Node Minimum HA Deployment
    [2017-10-23 08:56:57,392]  INFO {org.wso2.carbon.stream.processor.core.ha.HAManager} - HA Deployment: Starting up as Active Node

    In the passive node:

    [2017-10-23 08:57:39,775]  INFO {org.wso2.carbon.stream.processor.core.internal.ServiceComponent} - WSO2 Stream Processor Starting in Two Node Minimum HA Deployment
    [2017-10-23 08:57:39,787]  INFO {org.wso2.carbon.stream.processor.core.ha.HAManager} - HA Deployment: Starting up as Passive Node


  • No labels