||
Skip to end of metadata
Go to start of metadata

Distributed deployment of WSO2 Complex Event Processor (CEP) enables it to achieve high availability and scalability. There are three types of distributed deployment for CEP listed as follows.

Each deployment type can be configured by making changes to the <CEP_HOME>/repository/conf/event-processor.xml file.

High availability deployment

Pros: Zero downtime and no data loss during system failure.

Cons: Minimum 2 nodes should be needed. 

CEP supports a deployment scenario that has focus on high availability (HA) along with HA processing. To enable HA processing, you must have two CEP servers in a cluster. Alternatively you can have a three node deployment by adding a third node as a Presenter, which can be used to display dashboards and to publish pollable endpoints to retrieve data.

For this deployment, both the CEP nodes must be configured to receive all events. To achieve this, clients can either send all the requests to both the nodes or each request to any one of the two nodes (i.e., using load balancing or failover mechanisms). If clients send all the requests to both nodes, the user has to specify that events are duplicated in the cluster (i.e., the same event comes to all the members of the cluster). Alternatively, if a client sends a request to one node, internally it will send that particular request to the other node as well. This way, even if the clients send requests to only one node, both CEP nodes will receive all the requests. 

In this scenario, one CEP node works in active mode and the other works in passive mode. However, both nodes process the whole data but only one node sends out the notifications through the publisher. 

If the notifying node fails, the other node becomes active and starts sending notifications through the publisher in place of the failed node. 

When the failed node is up again, through syncing, it will fetch all the internal states of the current active node.

The newly arrived node will then become the passive node, and start processing all the incoming messages to keep its state allied with the active node, such that it can become active if the current active node fails.

Adding a Presenter node to the HA cluster

In a scenario where a certain users only need to view the processed data in a dashboard, the solution is to configure another CEP node (Presenter). If there is a Presenter node configured with the above two nodes, the active node will always publish the processed data to the Presenter node if a polling a publisher is available. 

The above mentioned scenarios can be configured as mentioned below. Setting up Presenter node is optional and in that case only the active node will be sending the data that need to be presented to the Presenter node.

Configuring high availability in CEP nodes

Do the following steps to configure the two worker nodes, and the Presenter node (optional) in HA mode.

Note: In the following configurations, the "host-ip-address" must be replaced with the IP address of the host machines.

  1. Do the clustering configurations in the <PRODUCT_HOME>/repository/conf/axis2/axis2.xml file for worker 1 node, worker 2 node and the Presenter node. 

    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...    	
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">localhost</parameter>
            ...
            <parameter name="localMemberPort">4000</parameter>
            ...
    		<!-- Add All the members (including local member) with their ip addresses, other then localhost/127.0.0.1 -->
            <members>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4000</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4001</port>
                </member>
    			<!-- if the presenter node is available
    			<member>
                    <hostName>localhost</hostName>
                    <port>4002</port>
                </member>
    			-->
            </members>
        ...
        </clustering>
    </axisconfig>
    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...    	
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">localhost</parameter>
            ...
            <parameter name="localMemberPort">4001</parameter>
            ...
    		<!-- Add All the members (including local member) with their ip addresses, other then localhost/127.0.0.1 -->
            <members>
                <member>
                    <hostName>localhost</hostName>
                    <port>4000</port>
                </member>
                <member>
                    <hostName>localhost</hostName>
                    <port>4001</port>
                </member>
    			<!-- if the presenter node is available
    			<member>
    				<hostName>localhost</hostName>
                    <port>4002</port>
                </member>
    			-->
            </members>
        ...
        </clustering>
    </axisconfig>
    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...    	
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
            <parameter name="localMemberHost">localhost</parameter>
            ...
            <parameter name="localMemberPort">4002</parameter>
            ...
    		<!-- Add All the members (including local member) with their ip addresses, other then localhost/127.0.0.1 -->
            <members>
                <member>
                    <hostName>localhost</hostName>
                    <port>4000</port>
                </member>
                <member>
                    <hostName>localhost</hostName>
                    <port>4001</port>
                </member>
    			<member>
    				<hostName>localhost</hostName>
                    <port>4002</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>

    Enable Hazelcast clustering on all the nodes. The following are the changes required.

    • Enable wka mode on both nodes. See About Membership Schemes for more information on wka mode. 

    • Define two or three (including presenter node) CEP nodes as well known members in the cluster, so worker node 1, worker node 2 and the Presenter node (if available) are defined under the members tag in each node.

    • Set the server's IP address as the localMemberHost value.
  2. Enable HA processing mode for worker node 1, worker node 2 and the Presenter node. To do this, make the following changes in the <CEP_HOME>/repository/conf/event-processor.xml file. These changes disable the SingleNode and the Distributed processing modes and allows you to specify the host and ports for the worker nodes for the snapshot server. It also configures the event syncing server to start and the Presenter node configuration specifies its host and port to receive processed events from the worker nodes.

    <!-- set enable to false -->
    <mode name="SingleNode" enable="false">
        ...
    </mode>
    
    <!-- HA Mode Config -->
    <!-- set enable to true -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="true"/>
            <presenter enable="false"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
            <port>11224</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Number of events that could be queued at publisher to sync output between CEP nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
            <port>10005</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>0.0.0.0</hostName>
            <port>11000</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
    </mode>
    
    <!-- Distributed Mode Config -->
    <!-- set enable to false -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    <!-- set enable to false -->
    <mode name="SingleNode" enable="false">
        ...
    </mode>
    
    <!-- HA Mode Config -->
    <!-- set enable to true -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="true"/>
            <presenter enable="false"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
    		<!-- if the server started with a port offset, it should be added to 11224 -->
            <port>11224</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Number of events that could be queued at publisher to sync output between CEP nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
    		<!-- if the server started with a port offset, it should be added to 11224 -->
            <port>10005</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>0.0.0.0</hostName>
            <port>11000</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
    </mode>
    <!-- Distributed Mode Config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    <mode name="SingleNode" enable="false">
        ...
    </mode>
    
    <!-- HA Mode Config -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="false"/>
            <presenter enable="true"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>0.0.0.0</hostName>
            <port>11224</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Number of events that could be queued at publisher to sync output between CEP nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
        </eventSync>
        <management>
            <hostName>0.0.0.0</hostName>
            <port>10005</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>localhost</hostName>
    		<!-- if the server started with a port offset, it should be added to 11002 -->
            <port>11002</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <!--Number of threads to be used by TCPEvent Server to publish events to the CEP nodes-->
            <receiverThreads>10</receiverThreads>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
    </mode>
    <!-- Distributed Mode Config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>

    Note: You only enable the high availability mode configurations here and disable all the other modes. You must configure the host name and port according to the settings in your server.

  3. Share the registry databases. You may need to set up the databases first by following the instructions in the Setting up the Database topic. However, the following datasource configurations are done for the default H2 database and using a SQL database as the shared registry and user management in <CEP_HOME>/repository/conf/datasources/master-datasource.xml in each node.

    <datasources-configuration xmlns:svns="http://org.wso2.securevault/configuration">
        ...
        <datasources>
            ...
            <datasource>
                <name>WSO2_CARBON_DB</name>
                <description>The datasource used for registry and user manager</description>
                <jndiConfig>
                    <name>jdbc/WSO2CarbonDB</name>
                </jndiConfig>
                <definition type="RDBMS">
                    <configuration>
                        <url>jdbc:h2:repository/database/WSO2CARBON_DB;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=60000</url>
                        <username>wso2carbon</username>
                        <password>wso2carbon</password>
                        <driverClassName>org.h2.Driver</driverClassName>
                        <maxActive>50</maxActive>
                        <maxWait>60000</maxWait>
                        <testOnBorrow>true</testOnBorrow>
                        <validationQuery>SELECT 1</validationQuery>
                        <validationInterval>30000</validationInterval>
                        <defaultAutoCommit>false</defaultAutoCommit>
                    </configuration>
                </definition>
            </datasource>
            <datasource>
                <name>WSO2_SHARED_REG</name>
                <description>The datasource used for registry</description>
                <jndiConfig>
                    <name>jdbc/WSO2SharedReg</name>
                </jndiConfig>
                <definition type="RDBMS">
                    <configuration>
                        <url>jdbc:mysql://localhost:3306/WSO2_SHARED_REG</url>
                        <username>root</username>
                        <password>root</password>
                        <driverClassName>com.mysql.jdbc.Driver</driverClassName>
                        <maxActive>50</maxActive>
                        <maxWait>60000</maxWait>
                        <testOnBorrow>true</testOnBorrow>
                        <validationQuery>SELECT 1</validationQuery>
                        <validationInterval>30000</validationInterval>
                        <defaultAutoCommit>false</defaultAutoCommit>
                    </configuration>
                </definition>
            </datasource>
            <datasource> 
                <name>WSO2_USER_DB</name> 
                <description>The datasource used for user management</description> 
                <jndiConfig> 
                    <name>jdbc/WSO2UMDB</name> 
                </jndiConfig> 
                <definition type="RDBMS"> 
                    <configuration> 
                        <url>jdbc:mysql://localhost:3306/WSO2_USER_DB</url> 
                        <username>root</username> 
                        <password>root</password> 
                        <driverClassName>com.mysql.jdbc.Driver</driverClassName> 
                        <maxActive>50</maxActive> 
                        <maxWait>60000</maxWait> 
                        <testOnBorrow>true</testOnBorrow> 
                        <validationQuery>SELECT 1</validationQuery> 
                        <validationInterval>30000</validationInterval> 
                    </configuration> 
                </definition> 
            </datasource>
            ...
        </datasources>
        ...
    </datasources-configuration>

    The following configurations should be added to the <CEP_HOME>/repository/conf/registry.xml file in every node as well

    <wso2registry>
        ...
        <dbConfig name="remote_registry"> 
            <dataSource>jdbc/WSO2SharedReg</dataSource> 
        </dbConfig>
        <remoteInstance url="https://localhost:9443/registry"> 
          	<id>instanceid</id>
          	<dbConfig>remote_registry</dbConfig>
          	<cacheId>root@jdbc:mysql://localhost:3306/CEP_DB</cacheId>
          	<readOnly>false</readOnly>
          	<enableCache>true</enableCache>
          	<registryRoot>/</registryRoot>
        </remoteInstance>
        <mount path="/_system/config" overwrite="true">
          	<instanceId>instanceid</instanceId>
          	<targetPath>/_system/nodes</targetPath>
        </mount>
        <mount path="/_system/governance" overwrite="true">
          	<instanceId>instanceid</instanceId>
          	<targetPath>/_system/governance</targetPath>
        </mount>
    	...
    </wso2registry>

    To configure the datasource, update the dataSource property found in <PRODUCT_HOME>/repository/conf/user-mgt.xml of the nodes as shown below:

    <Property name="dataSource">jdbc/WSO2UMDB</Property>

    Note: Since this uses MySQL database, Please refer to the setting up MySQL topic.

  4. Enable SVN synchroniser for both the nodes. (to make sure both nodes have exactly same artefacts, both nodes failing to have same artefacts deployed will lead to unpredictable results).

    Note: When enabling Deployment Synchronization in the CEP in HA mode, since both nodes are manager nodes, make sure only one CEP node has a configuration with auto commit to 'true'

Distributed mode deployment

Pros: Supports scalability with Apache Strorm.

Cons: The current deployment does not support HA. 

The following image provides a brief overview of how Distributed CEP works. 

The CEP Event Receivers component receives events from an external source and converts it into a format that the Siddhi engine can understand. This component then sends these events to the Event Streams component. The Event Stream component acts as the central hub for all event and it handles all event streams in the system. The Event Processor handles actual event processing and is the core event processing unit of the CEP. It manages different execution plans and processes events based on logic, with the help of different Siddhi engines available in a Storm topology. The Event Processor receives a set of event streams from the Event Stream component, processes them using the Siddhi engine in Storm, and triggers new events on different event streams back to the Event Streams component. The CEP Event Publisher receives processed events from the Event Streams component and sends them to the external event sink in the configured form. The Storm topology runs Siddhi and has receivers and publishers to communicate with CEP receivers and publishers.

This section explains how to configure WSO2 CEP to run with Apache Storm in the distributed mode. There are four main components that are involved when running CEP with Storm.

  • CEP Workers: Receives events from an external source and converts it into a format that the Siddhi engine can understand and sends these events to the Storm, and receives processed events from the Strom engine and sends them to the external event sink in the configured form.
  • Siddhi Storm Topology: The Storm topology runs Siddhi Core in a distributed manner. The topology itself has receivers and publishers to communicate with CEP.
  • CEP Manager Service: Deploys the topology and keeps track of CEP/Storm Receivers/Publishers and enables those components to discover communicate between each other.
  • CEP Presenters: Nodes which are designated to the aggregate events streams in multiple nodes to be presented through " Analytics Dashboards ". Dashboards should be created in these nodes.

The following image depicts the high level architecture for the distributed mode.

Related links

For more information on the Apache Storm topology, see the following links.

It's required to setup a apache storm cluster in addition to following the below steps. Please refer Storm documentation for detailed information on how to setup a storm cluster.

Note: Distributed setup was tested on Apache Storm 0.9.5

Configuring the event-processor.xml file

The event-processor.xml file contains configurations of all three CEP event processing modes. To run with Apache Storm, disable SingleNode and HA processing modes. Distributed mode contains configurations of both CEP Receivers/Publishers and Storm Receivers/Publishers. This file is located in the <CEP_HOME>/repository/conf directory.  Following are the contents of the file, if you are to use distributed mode of each type of node.

 

<eventProcessorConfiguration>
	<mode name="SingleNode" enable="false">
        ..
    </mode>
	
    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="false"/>
            <manager enable="true">
                <hostName>172.17.42.1</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="false">
                <hostName>localhost</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>
<eventProcessorConfiguration>
	<mode name="SingleNode" enable="false">
        ..
    </mode>
	
    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="true"/>
            <manager enable="false">
                <hostName>localhost</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="false">
                <hostName>localhost</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>
<eventProcessorConfiguration>
	<mode name="SingleNode" enable="false">
        ..
    </mode>
	
    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="false"/>
            <manager enable="false">
                <hostName>localhost</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="true">
                <hostName>172.17.42.3</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>

Keep the following in mind when configuring this file.

  • You must only enable the distributed mode configurations here and ensure that all other modes are set to false. 
  • A single node can act as Manager, worker and presenter at the same time.

  • You can start the node as a CEP manager only by setting the manager element to true and worker and presenter elements to false. Similarly setting enable true in woker/ presenter elements will make the node act as a worker/presenter. If all are disabled, the server starts as a normal CEP node.

  • When starting CEP as a manager node,  hostName/IP and port for the storm manager service should be specified. Similarly, when starting CEP as a preseneter node hostName/IP and port should be specified for event synching service to be hosted.

    <nodeType>
       <worker enable="true"/>
       <manager enable="true">
             <hostName>172.17.42.1</hostName>
             <port>8904</port>
       </manager>
       <presenter enable="true">
             <hostName>172.17.42.1</hostName>
             <port>11000</port>
       </presenter>
     </nodeType>
  • Specify the portRange to list out the range of ports that can be opened to receive events in both CEP and Strom when communicating with the Storm topology. This can be any range of ports.

    <portRange>
          <min>15000</min>
          <max>15100</max>
    </portRange>
  • Specify CEP managers by listing their hostNames and ports.

    <managers>
          <manager>
              <hostName>172.17.42.1</hostName>
              <port>8904</port>
          </manager>
          <manager>
              <hostName>172.17.42.2</hostName>
              <port>8904</port>
          </manager>
    </managers>

Enabling clustering

Enable clustering in all managers, workers and presenters. It is necessary for all node to belong to the same cluster domain under wka membership schema.

  1. Open the <CEP_HOME>/repository/conf/axis2/axis2.xml file and scroll down to the 'Clustering' section.
  2. Set the 'enable' attribute of the <clustering> element to true. This enables clustering for that node.
    <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
  3. Change the 'membershipScheme' parameter to 'wka'. For more information on membership schemes and which to choose, read the section about membership schemes.
    <parameter name="membershipScheme">wka</parameter>
  4. Provide a domain for the cluster. The domain, which is ‘wso2.carbon.domain’ in this particular example configuration, must be the same for all the managers and workers. All the nodes having the domain as ‘wso2.carbon.domain’ will be part of the same cluster.
    <parameter name="domain">wso2.carbon.domain</parameter>
  5. Specify the 'localMemberHost' and 'localMemberPort' parameters. The localMemberHost is the IP address of the CEP instance you’re configuring. The localMemberPort is the port on which this CEP instance will be listening for incoming cluster messages. If you are running multiple CEP instances on the same host, make sure that this port value does not conflict with the localMemberPort value of the other CEP instances.

    <parameter name="localMemberHost">172.17.42.1</parameter>
    <parameter name="localMemberPort">4000</parameter>
  6. Specify well known members for the cluster. It is necessary to have at least two well-known address (WKA) members in order to work correctly and to recover if a single WKA member fails. All the managers and workers qualify to be a well-known member. For more information on well-known members, read the section about membership schemes. The following is an example where the cluster has two members as well-known members. The port that we provide here should be equal to the 'localMemberPort' of the well known member.

    <members>
    	<member>
    		<hostName>172.17.42.1</hostName>
    		<port>4000</port>
    	</member>
    	<member>
    		<hostName>172.17.42.2</hostName>
    		<port>4000</port>
    	</member>
    	<member>
    		<hostName>172.17.42.3</hostName>
    		<port>4000</port>
    	</member>
     </members>
  7. Save and close the file and restart the servers (if running) for the changes to take effect.

Configure storm.yaml

This file is located in <CEP_HOME>/repository/conf/cep/storm. Configurations releated to the storm topology has to be specified in this file. Importatnly, when the storm cluster is not residing in the same machine as CEP manager node the IP address of the Storm Nimbus must be specified in this file for each CEP manager as follows,

nimbus.host: "10.100.5.42"

Other configurations releated to storm topologies can be specified here (e.g. topology.workers: 2). Configurations specified in this file will be applied to all strom topologies submitted to the strom cluster by CEP manager. Please refer defaults.yaml for more information.

Share the registry database

Create the database in a DB server and point to it from all nodes. Scripts in <CEP_HOME>/dbscripts can be used to create the databse. Please refer Setting up the Database for detailed information on how to create the DB.

After creating the DB change the WSO2_CARBON_DB datasource as follows in <CEP_HOME>/repository/conf/datasources/master-datasource.xml in all CEP managers, CEP workers and CEP presenters.

<datasource>
     <name>WSO2_CARBON_DB</name>
     <description>The datasource used for registry and user manager</description>
     <jndiConfig>
        <name>jdbc/WSO2CarbonDB</name>
     </jndiConfig>
     <definition type="RDBMS">
        <configuration>
           <url>jdbc:mysql://<mysql-server-ip>:3306/<database-name></url>
           <username>root</username>
           <password>root</password>
           <driverClassName>com.mysql.jdbc.Driver</driverClassName>
           <maxActive>50</maxActive>
           <maxWait>60000</maxWait>
           <testOnBorrow>true</testOnBorrow>
           <validationQuery>SELECT 1</validationQuery>
           <validationInterval>30000</validationInterval>
        </configuration>
     </definition>
</datasource>

For a single node deployment the default H2 databsse can be used. However, it's not recommended to use H2 DB for a production enviornment.

Setup deployment synchronizer

Setup deployment synchronizer for synchronizaing artifacts across the cluster. CEP managers must be configured as manager nodes and, CEP worker and CEP presenter nodes has to be configured and worker nodes. Please refere SVN-Based Deployment Synchronizer documentation for more information.

Please note that artifacts has to be deployed only through CEP manager nodes.

Running the CEP server

Ensure that you have disabled HA and SingleNode modes in the event-processor.xml file so that the CEP server starts in distributed mode and attempts to connect to the receivers and publisher of the Storm topology. Also, make sure that the Apache storm cluster is running. Then Start up the CEP servers.

Persistence mode deployment 

Pros: Can be implemented with one live node.

Cons: For non polling receivers the data received after the last state persistence till the server restart will get lost during failure. 

Using persistence mode enables CEP Queries to span lifetimes much greater than server uptime. This is achieved by taking periodic snapshots and storing all state information and windows in a scalable persistence store. CEP uses a file system-based persistence store in order to achieve high scalability and low latency when persisting data. In the file system-based persistence store, snapshots are saved in the <CEP_HOME>/repository/cep_persistence directory that is created automatically. In this mode, if the server crashes, the same server or another CEP server with the same execution plans are connected to the same file system should be started and the data in the persistence store will be restored from the last available snapshot.

Use the following steps to enable persistence mode processing for CEP.

  1. Enable the SingleNode processing mode and disable other processing modes. Within the Do the SingleNode processing also enable the persistence to true, Do following changes to the <CEP_HOME>/repository/conf/event-processor.xml file.

    event-processor.xml
    <eventProcessingConfig>
    
      <mode name="SingleNode"  enable="true">
            <persistence enable="true">
                <persistenceIntervalInMinutes>15</persistenceIntervalInMinutes>
                <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
                <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                    <property key="persistenceLocation">cep_persistence</property>
                </persister>
            </persistence>
        </mode>
    
    
        <!-- HA Mode Config -->
        <mode name="HA" enable="false">
            <eventSync>
                <hostName>localhost</hostName>
                <port>11224</port>
                <reconnectionInterval>20000</reconnectionInterval>
                <serverThreads>20000</serverThreads>
            </eventSync>
            <management>
                <hostName>localhost</hostName>
                <port>10005</port>
            </management>
        </mode>
    
    
        <!-- Distributed Mode Config -->
    
        <mode name="Distributed" enable="false">
    
            <nodeType>
                <worker enable="true"/>
                <manager enable="true">
                    <hostName>localhost</hostName>
                    <port>8904</port>
                </manager>
            </nodeType>
    
            <management>
                <managers>
                    <manager>
                        <hostName>localhost</hostName>
                        <port>8904</port>
                    </manager>
                    <manager>
                        <hostName>localhost</hostName>
                        <port>8905</port>
                    </manager>
                </managers>
                <reconnectionInterval>20000</reconnectionInterval>
                <heartbeatInterval>5000</heartbeatInterval>
                <topologyResubmitInterval>10000</topologyResubmitInterval>
            </management>
    
            <transport>
                <portRange>
                    <min>15000</min>
                    <max>15100</max>
                </portRange>
                <reconnectionInterval>20000</reconnectionInterval>
    			<cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
                <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
                <tcpEventPublisherMode>blocking</tcpEventPublisherMode>
                <tcpEventPublisherOutputQueueSize>8192</tcpEventPublisherOutputQueueSize>
                <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
                <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
                <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
    	 	</transport>
    
            <stormJar>org.wso2.carbon.event.processor.storm.jar</stormJar>
    
            <distributedUIUrl></distributedUIUrl>
    
        </mode>
    
    </eventProcessingConfig>

    Note: You only enable the SingleNode mode & persistence configurations here and disable all the other modes.

    Tip: The default persistence store used in CEP is org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore. Values for persistenceInterval and SchedulerPoolSize can be changed according to the requirements.

  2. In order to persist states at shutdown, use graceful shutdown from the CEP server. The steps are as follows:

    1. Sign In. Enter your user name and password to log on to the Complex Event Processor Management Console.

    2. Click Shutdown/Restart Server under the Manage section of the Management Console. This redirects you to the Shutdown/Restart Server page.
    3. Click Graceful Restart and click Yes to confirm the server restart when prompted.

Backup recovery process

The backup recovery procedure of WSO2 CEP will be as follows.

Prerequisites for the recovery procedure

You should have the following as prerequisites in the currently used CEP server to carry out the backup recovery procedure.

  • Have the CEP artifacts and configurations backed up.
  • Have an externally shared governance registry mounted. For more information on mounting governance registry, see Remote Instance and Mount Configuration Details.
  • Runtime state persisted through the persistence mode deployment.

Backup recovery procedure

After the above prerequisites are satisfied, follow the steps below for backup recovery of the CEP server in a VM crash scenario:

  1. Copy the backed up artifacts to the recovered CEP server instance.
  2. Point the recovered CEP server instance to the same externally shared governance registry, which was mounted in the previous CEP instance. For more information on mounting governance registry, see Remote Instance and Mount Configuration Details.
  3. Copy the snapshots stored in previous CEP instance to the <CEP_HOME>/repository/cep_persistence directory of the current CEP instance
  • No labels