This documentation is for WSO2 Stream Processor 4.1.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

A datacenter refers to any clustered setup (i.e., a Minimum High Availability Deployment or a Fully Distributed Deployment) that has a separate message broker configured for it.

This is a WSO2 SP deployment pattern where two identical setups of SP are run in two different datacenters for disaster recovery. The setups are typically maintained at two different physical locations to to minimize the risk of both the datacenters failing at the same time.

Out of the two datacenters, only one datacenter publishes events. This datacenter is referred to as the active center, and the other datacenter is referred to as the passive datacenter. If the active datacenter fails, the previously passive datacenter becomes the active datacenter and starts publishing events from where the previous datacenter stopped.

In order to run WSO2 SP in this setup, the following is required:

RequirementPurpose
State SynchronizationIn order to enable the passive datacenter to start publishing messages from where the active datacenter stopped before failing, both datacenters should receive the same events and process them simultaneously. Therefore, the publisher that send messages to this WSO2 SP setup must be configured to publish events to both the datacenters.
A message brokerIn order to achieve guarenteed delivery and other QoS (Quality of Service) characteristics such as processing each event exactly once, ordering of events, etc., a message broker must be used. Two separate instances of the broker can be deployed in each datacenter. When sending messages to this WSO2 SP setup, the publisher should send events to both brokers so that both datacenters build their state simultaneously. This ensures that if one datacenter fails, the messages lost as a result can be fetched from the broker of the other datacenter.
A sequence number for each message

If the active datacenter fails and the currently passive datacenter has to start publishing events, you need to ensure the following:

  • No duplicated events must be created in the recovering datacenter when it fetches the lost events from the broker in the other datacenter. Assigning a unique sequence ID to each event allows the recovering node to identify duplicated events, and ensure that each event is processed only once to avoid an unnecessary system overhead.
  • When the active datacenter fails, the passive datacenter must be able to identify the messages that are not already published,. This can be achieved by assigning each event a unique sequence ID.
A sink

This is configured in order to send the input events to be processed by a Siddhi application to both the brokers in the to WSO2 SP datacenters.

The following conditions should be met in the event sink configuration:

  • The type parameter must have kafkaMultiDC as the value. This is a sink type that publishes events to brokers configured for datacenters.
  • The hosts of the two message brokers to which the published events should be sent must be listed for the bootstrap.servers parameter.
  • A sequence ID must be configured so that if the active datacenter fails, the passive datacenter can identify the events that were already published and avoid publishing them again. The sequence number assigned to each event is always greater by one than the previous event even if the previous event is not successfully published. 

e.g.,

@sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092',  sequence.id=’sink1’, @map(type='xml')) 
Define stream BarStream (symbol string, price float, volume long);

For more information see kafkaMultiDC extension.

A source

This is configured for each node in each datacenter to receive events. Each node receives the events sent to the message broker of the datacenter to which it belongs.

 The following conditions must be met in this configuration:

  • The type parameter must have kafkaMultiDC as the value.
  • Two hosts to which the received events should be sent must be listed for the bootstrap.servers parameter.

e.g.,

@source(type=kafkaMultiDC,  bootstrap.servers='host1:9092, host2:9092')
define stream Foo (attribute1 string, attribute2 int);

For more information see kafkaMultiDC extension.

Configuring a multi DC setup

To configure a multi DC set-up, follow the steps below:

  1. Set up two working SP clusters. Each cluster can be either a minimum HA cluster or a fully distributed cluster.
  2. Download and install a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics such as processing each event exactly once, ordering of events, etc. Deploy two instances of this server in each datacenter. When sending messages to this WSO2 SP setup, the publisher should send events to both brokers so that both datacenters build their state simultaneously. This ensures that if one datacenter fails, the messages lost as a result can be fetched from the broker of the other datacenter. In this example, a Kafka message broker is set up. For detailed instructions to set up a Kafka message broker, see Kafka Documentation.
  3. To create a Siddhi application that sends messages to both the clusters, follow the steps below.
    1. Add a name  for the Siddhi application as shown in the example below.

      @App:name('SweetTotalApp')
    2. Let's consider a simple scenario where information relating to the production in a factory is recorded. The sum of the production amount for each product is recorded and published as the output. For this, add the definitions for the required input and output streams, and the query as shown below.

      @App: name ( 'SweetTotalApp' )

      define stream SweetProductionStream ( name string, amount long);
      define stream SweetTotalStream( name string, totalProduction long);
       
      @info( name = 'SweetTotalQuery' )
      from SweetProductionStream
      select name sum (amount)  as totalProduction
      group by name
      insert into SweetTotalStream;
    3. To receive the events to be processed, add a KafkaMultiDC event source to the input stream definition (in this example, SweetProductionStream) as shown below.

      @App:name('SweetTotalApp')

      @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);
      define stream SweetTotalStream(name string, totalProduction long);
       
      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;

      In this multi datacenter scenario, the input events need to be received by two clusters. This is facilitated via the kafkaMultiDC source type. Once you select  this source type, you can specify the two hosts to which the input events need to be sent via the bootstrap.servers parameter. For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Source.

    4. To publish the output, connect a sink of the required type. In this example, let's assume that they are published as logs.

      @App:name('SweetTotalApp')

      @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);

      @sink(type= 'log' , prefix= 'Sweet Totals:' )
      define stream SweetTotalStream(name string, totalProduction long);

      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;


    5. Save the changes. The completed Siddhi application is as follows.

      @App:name('SweetTotalApp')
      
      @source(type=kafkaMultiDC,  bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);
      
      @sink(type='log', prefix='Sweet Totals:')
      define stream SweetTotalStream(name string, totalProduction long);
      
      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;
    6. Deploy this Siddhi application in the production environment for each datacenter as follows.
      1. Place the Siddhi application in the  <SP_HOME>/wso2/worker/deployment/siddhi-files  directory of all the required nodes.
      2. Start all the worker nodes of both the clusters. For more information, see Deploying Streaming Applications.
  4. If a failure occurs, the passive datacenter must be able to start publishing messages from where the active datacenter stopped before failing. To allow this, open the <SP_HOME>/conf/worker/deployment.yaml file, and under the state.persistence section, set the enabled parameter to true. Then specify the required information about the data store in which the messages are persisted. For detailed instructions, see Configuring Database and File System State Persistence.
  5. The input information to be processed needs to be received by both the clusters. To do enable this, create another Siddhi application that functions in the same manner as an external client by following the steps below.

    This step is optional. You can alternatively configure your external client to publish events to both the datacenters.


    1. Add a name  for the Siddhi application as shown in the example below.

      @App:name('ProductionTotalReportingApp')
    2. To capture the input information (in this example, the name of the item produced and the quantity produced), add an input stream definition as shown below.

      @App:name('ProductionTotalReportingApp')
      define stream GetProductionAmountStream(name string, totalProduction long);

      Let's assume that input data is received to this stream via HTTP and connect a source to this stream as follows.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' ))
      define stream GetProductionAmountStream(name string, totalProduction long);

    3. This Siddhi application needs to publish the information it receives without any further processing. Therefore, add an output stream definition with the same schema as shown below.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' ))
      define stream GetProductionAmountStream(name string, totalProduction long);

      define stream SweetProductionStream(name string, totalProduction long);
    4. Now add a Siddhi query as follows to direct the data from the input stream to the output stream as follows.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' ))
      define stream GetProductionAmountStream(name string, totalProduction long);

      @sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092', sequence.id=’sink1’, @map(type='xml'))
      define stream SweetTotalStream(name string, totalProduction long);

      from GetProductionAmountStream
      select name, amount
      insert into SweetProductionStream;
    5. The information needs to be published to Kafka topics to which the nodes of the two datacenters have subscribed. To do this, connect a sink of the kafkaMultiDC type to the output stream (in this example, SweetProductionStream) as shown below.

      In this configuration, kafkaMultiDC is selected as the sink type because it supports the following:

      • As mentioned before, the multi DC setup uses a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics. The sink of the kafkaMultiDC type allows you to refer to the topic of the message broker you configured via the topic parameter.
      • The servers to which the events are published are specified as a comma-separated list via the bootstrap.servers parameter
      • A unique sequence ID can be assigned to each event via the  sequence.id  parameter. These are assigned for the following purposes.
        • No duplicated events must be created in the recovering datacenter when it fetches the lost events from the broker in the other datacenter. Assigning a unique sequence ID to each event allows the recovering node to identify duplicated events, and ensure that each event is processed only once to avoid an unnecessary system overhead.
        • When the active datacenter fails, the passive datacenter must be able to identify the messages that are not already published,. This can be achieved by assigning each event a unique sequence ID.

      For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Sink.

      The completed Siddhi application is as follows.

      @App:name('ProductionTotalReportingApp')
      
      @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json'))
      
      define stream GetProductionAmountStream(name string, totalProduction long);
      
      
      @sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092',  sequence.id=’sink1’, @map(type='xml'))
      define stream SweetTotalStream(name string, totalProduction long);
      
      
      from GetProductionAmountStream
      select name, amount
      insert into SweetProductionStream;
    6. Save this Siddhi Application. Then deploy it in a separate SP pack by placing it in the <SP_HOME>/wso2/worker/deployment/siddhi-files directory. 

      This SP pack should be a separate pack that can be run externally to the multi DC cluster.

    7. Run the SP instance in which you deployed the Siddhi application by issuing one of the following commands from the <SP_HOME>/bin directory.

      • For Windows:   editor.bat
      • For Linux ./editor.sh
    8. To process information via this setup, collect the required events via the ProductionTotalReportingApp Siddhi application. For more information, see Collecting Events.

  • No labels