This documentation is for WSO2 Stream Processor 4.4.0 (the latest version of WSO2 SP. View documentation for the Streaming Integrator, the successor of WSO2 SP.

All docs This doc
Skip to end of metadata
Go to start of metadata

In this deployment, both data centers function as active nodes. Therefore, both data centers process and publish events simultaneously.

Configuring the multi DC setup

To configure a multi DC set-up, follow the steps below:

  1. Set up two working SP clusters. Each cluster can be either a minimum HA cluster or a fully distributed cluster.
  2. Download and install a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics such as processing each event exactly once, ordering of events, etc. Deploy two instances of this server in each datacenter. When sending messages to this WSO2 SP setup, the publisher should send events to both brokers so that both datacenters build their state simultaneously. This ensures that if one datacenter fails, the messages lost as a result can be fetched from the broker of the other datacenter. In this example, a Kafka message broker is set up. For detailed instructions to set up a Kafka message broker, see Kafka Documentation.
  3. To create a Siddhi application that sends messages to both the clusters, follow the steps below.
    1. Add a name  for the Siddhi application as shown in the example below.

      @App:name('SweetTotalApp')
    2. Let's consider a simple scenario where information relating to the production in a factory is recorded. The sum of the production amount for each product is recorded and published as the output. For this, add the definitions for the required input and output streams, and the query as shown below.

      @App: name ( 'SweetTotalApp' )

      define stream SweetProductionStream ( name string, amount long);
      define stream SweetTotalStream( name string, totalProduction long);
       
      @info( name = 'SweetTotalQuery' )
      from SweetProductionStream
      select name sum (amount)  as totalProduction
      group by name
      insert into SweetTotalStream;
    3. To receive the events to be processed, add a KafkaMultiDC event source to the input stream definition (in this example, SweetProductionStream) as shown below.

      @App:name('SweetTotalApp')

      @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);
      define stream SweetTotalStream(name string, totalProduction long);
       
      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;

      In this multi datacenter scenario, the input events need to be received by two clusters. This is facilitated via the kafkaMultiDC source type. Once you select  this source type, you can specify the two hosts to which the input events need to be sent via the bootstrap.servers parameter. For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Source.

    4. To publish the output, connect a sink of the required type. In this example, let's assume that they are published as logs.

      @App:name('SweetTotalApp')

      @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);

      @sink(type= 'log' , prefix= 'Sweet Totals:' )
      define stream SweetTotalStream(name string, totalProduction long);

      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;


    5. Save the changes. The completed Siddhi application is as follows.

      @App:name('SweetTotalApp')
      
      @source(type=kafkaMultiDC,  bootstrap.servers='host1:9092, host2:9092')
      define stream SweetProductionStream (name string, amount long);
      
      @sink(type='log', prefix='Sweet Totals:')
      define stream SweetTotalStream(name string, totalProduction long);
      
      @info(name='SweetTotalQuery')
      from SweetProductionStream
      select name, sum(amount) as totalProduction
      group by name
      insert into SweetTotalStream;
    6. Deploy this Siddhi application in the production environment for each datacenter as follows.
      1. Place the Siddhi application in the  <SP_HOME>/wso2/worker/deployment/siddhi-files  directory of all the required nodes.
      2. Start all the worker nodes of both the clusters. For more information, see Deploying Streaming Applications.
  4. If a failure occurs, the  datacenter that is still active must be able to start publishing messages from where the active datacenter stopped before failing. To allow this, open the <SP_HOME>/conf/worker/deployment.yaml file, and under the state.persistence section, set the enabled parameter to true. Then specify the required information about the data store in which the messages are persisted. For detailed instructions, see Configuring Database and File System State Persistence.
  5. If you are including aggregation queries in your Siddhi applications, they need to be partitioned because the processing is carried out by both data centers. To do this, follow the steps below.
    1. In this scenario, assume that the sweet totals need to be calculated per hour, day, month, and year. To do this, let's define an aggregation as follows.

      @App:name('SweetTotalApp') @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092') define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long);

      define aggregation SweetProductionAggregationfrom SweetProductionStream
      select name, sum(amount) as totalAmount
      group by name
      aggregate every hour...year @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;

      For more information about aggregations, see Incremental Analysis.

    2. To store the time-based aggregations calculated via the aggregation you added above, define a data store as follows. 

      Both data centers need to be connected to the same database.

      @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root"password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
      define aggregation SweetProductionAggregationfrom SweetProductionStream
      select name, sum(amount) as totalAmount
      group by name
      aggregate every hour...year

    3. To allow the aggregation to be partitioned, set the @PartitionById(enable='true') property as follows.

      @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root"password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
      @PartitionById(enable='true')
      define aggregation SweetProductionAggregationfrom SweetProductionStream
      select name, sum(amount) as totalAmount
      group by name
      aggregate every hour...year

      In order top make it possible for the aggregations to be partitioned by the node ID, a unique ID needs to be defined for each node in each cluster of the multi data senter set up. This ID is defined in the <SP_HOME>/conf/<PROFILE/deployment.yaml file. For more information, See Incremental Analysis - Scaling through distributed aggregations.

  6. The input information to be processed needs to be received by both the clusters. To do enable this, create another Siddhi application that functions in the same manner as an external client by following the steps below.

    This step is optional. You can alternatively configure your external client to publish events to both the datacenters.


    1. Add a name  for the Siddhi application as shown in the example below.

      @App:name('ProductionTotalReportingApp')
    2. To capture the input information (in this example, the name of the item produced and the quantity produced), add an input stream definition as shown below.

      @App:name('ProductionTotalReportingApp')
      define stream GetProductionAmountStream(name string, totalProduction long);

      Let's assume that input data is received to this stream via HTTP and connect a source to this stream as follows.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' ))
      define stream GetProductionAmountStream(name string, totalProduction long);
    3. This Siddhi application needs to publish the information it receives without any further processing. Therefore, add an output stream definition with the same schema as shown below.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' ))
      define stream GetProductionAmountStream(name string, totalProduction long);
      define stream SweetProductionStream(name string, totalProduction long);
    4. Now add a Siddhi query as follows to direct the data from the input stream to the output stream as follows.

      @App:name('ProductionTotalReportingApp')

      @source(type= 'http' , receiver.url= 'http://localhost:5005/SweetProductionEP' , @map(type =  'json' )
      define stream GetProductionAmountStream(name string, totalProduction long);

      @sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092', sequence.id=’sink1’, @map(type='xml'))
      define stream SweetTotalStream(name string, totalProduction long);

      from GetProductionAmountStream
      select name, amount insert into SweetProductionStream;
    5. The information needs to be published to Kafka topics to which the nodes of the two datacenters have subscribed. To do this, connect a sink of the kafkaMultiDC type to the output stream (in this example, SweetProductionStream) as shown below.

      In this configuration, kafkaMultiDC is selected as the sink type because it supports the following:

      • As mentioned before, the multi DC setup uses a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics. The sink of the kafkaMultiDC type allows you to refer to the topic of the message broker you configured via the topic parameter.
      • The servers to which the events are published are specified as a comma-separated list via the bootstrap.servers parameter
      • A unique sequence ID can be assigned to each event via the sequence.id parameter. These are assigned for the following purposes.
        • No duplicated events must be created in the recovering datacenter when it fetches the lost events from the broker in the other datacenter. Assigning a unique sequence ID to each event allows the recovering node to identify duplicated events, and ensure that each event is processed only once to avoid an unnecessary system overhead.
        • When one datacenter fails, the other datacenter must be able to identify the messages that are not already published,. This can be achieved by assigning each event a unique sequence ID.

      For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Sink.

      The completed Siddhi application is as follows.

      @App:name('ProductionTotalReportingApp')
      
      @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json'))
      
      define stream GetProductionAmountStream(name string, totalProduction long);
      
      
      @sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092',  sequence.id=’sink1’, @map(type='xml'))
      define stream SweetTotalStream(name string, totalProduction long);
      
      
      from GetProductionAmountStream
      select name, amount
      insert into SweetProductionStream;
    6. Save this Siddhi Application. Then deploy it in a separate SP pack by placing it in the <SP_HOME>/wso2/worker/deployment/siddhi-files directory. 

      This SP pack should be a separate pack that can be run externally to the multi DC cluster.

    7. Run the SP instance in which you deployed the Siddhi application by issuing one of the following commands from the <SP_HOME>/bin directory.

      • For Windows:   editor.bat
      • For Linux :  ./editor.sh
    8. To process information via this setup, collect the required events via the ProductionTotalReportingApp Siddhi application. For more information, see Collecting Events.

  • No labels