||
Skip to end of metadata
Go to start of metadata

Distributed deployment of WSO2 Complex Event Processor (CEP) enables it to achieve high availability and scalability. There are three types of distributed deployment for CEP listed as follows.

Each deployment type can be configured by making changes to the <CEP_HOME>/repository/conf/event-processor.xml file.

High availability deployment

Pros: Zero downtime and no data loss during system failure.

Cons: Minimum 2 nodes should be needed. 

CEP supports a deployment scenario that has focus on high availability (HA) along with HA processing. To enable HA processing, you must have at least two CEP servers in a cluster (an Active node and a Passive node). Alternatively, you can have a three node deployment by adding a third node as a Presenter, which can be used to display dashboards and to publish pollable endpoints to retrieve data. Form CEP 4.1.0 onwards, it introduces another node called Backup Node, which acts as a failover to the existing Passive node.

For this deployment, all the CEP nodes must be configured to receive all events. To achieve this, clients can either send all the requests to all the available nodes or each request to any one of the nodes (i.e., using load balancing or failover mechanisms). If clients send all the requests to all the nodes, then the user has to specify that events are duplicated in the cluster (i.e., the same event comes to all the members of the cluster). Alternatively, if a client sends a request to one node, internally it will send that particular request to the other node as well. This way, even if the clients send requests to only one node, all CEP nodes will receive all the requests.

In this scenario, one CEP node works in active mode, one node works in passive mode and optionally one will work in backup mode. However, all the nodes process the whole data but only one node sends out the notifications through the publisher.

If the notifying node fails, the other node becomes active and starts sending notifications through the publisher in place of the failed node.

When the failed node is up again, through syncing, it will fetch all the internal states of the current active node.

The newly arrived node will then become the passive node, and start processing all the incoming messages to keep its state allied with the active node, such that it can become active if the current active node fails.

Adding a Presenter node to the HA cluster

In a scenario where a certain users only need to view the processed data in a dashboard, the solution is to configure another CEP node (Presenter). If there is a Presenter node configured with the above two nodes, the active node will always publish the processed data to the Presenter node if a polling a publisher is available. 

Adding Backup nodes to the HA cluster

CEP 4.1.0 onwards it introduces a “backup node”, which acts as a failover to existing passive node. Currently, if the active node goes down, the available passive node becomes the new active node. However, if the passive node also goes down, there is no failover mechanism. With the newly introduced backup node, it will act as a failover to the current passive node. Setting up a Backup node is optional. However, improved HA deployment can be achieved by using this new backup node.

The above mentioned scenarios can be configured as mentioned below. Setting up the Presenter node is optional and in that case only the active node will be sending the data that need to be presented to the Presenter node.

Configuring high availability in CEP nodes

Do the following steps to configure the Active node, Passive node, Backup node (optional), and the Presenter node (optional)  in HA mode.

Note: In the following configurations, the "host-ip-address" must be replaced with the IP address of the host machines.

  1. Do the clustering configurations in the <PRODUCT_HOME>/repository/conf/axis2/axis2.xml file for the Active node, Passive node, Backup node and the Presenter node

    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...     
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- IP address of this member -->
            <parameter name="localMemberHost">host-ip-address</parameter>
            ...
            <parameter name="localMemberPort">4000</parameter>
            ...
            <!-- Add All the members (including local member) with their ip addresses, other than localhost/127.0.0.1 -->
            <members>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4000</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4001</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4002</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4003</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...     
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- IP address of this member -->
            <parameter name="localMemberHost">host-ip-address</parameter>
            ...
            <parameter name="localMemberPort">4001</parameter>
            ...
            <!-- Add All the members (including local member) with their ip addresses, other than localhost/127.0.0.1 -->
            <members>
                !-- if the presenter node is available
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4000</port>
                </member>
                -->
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4001</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4002</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4003</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...     
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- IP address of this member -->
            <parameter name="localMemberHost">host-ip-address</parameter>
            ...
            <parameter name="localMemberPort">4002</parameter>
            ...
            <!-- Add All the members (including local member) with their ip addresses, other than localhost/127.0.0.1 -->
            <members>
                !-- if the presenter node is available
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4000</port>
                </member>
                -->
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4001</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4002</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4003</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>
    <axisconfig name="AxisJava2.0">
        ...
        <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                    enable="true">
            ...     
            <parameter name="membershipScheme">wka</parameter>
            ...
            <!-- IP address of this member -->
            <parameter name="localMemberHost">host-ip-address</parameter>
            ...
            <parameter name="localMemberPort">4003</parameter>
            ...
            <!-- Add All the members (including local member) with their ip addresses, other than localhost/127.0.0.1 -->
            <members>
                !-- if the presenter node is available
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4000</port>
                </member>
                -->
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4001</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4002</port>
                </member>
                <member>
                    <hostName>host-ip-address</hostName>
                    <port>4003</port>
                </member>
            </members>
        ...
        </clustering>
    </axisconfig>

    Enable Hazelcast clustering on all the nodes. The following are the changes required.

    • Enable wka mode on both nodes. See About Membership Schemes for more information on wka mode. 

    • Define three or four (including backup node and presenter node) CEP nodes as well known members in the cluster, so worker node 1 (active node), worker node 2 (passive node), worker node 3 (backup node) and the Presenter node (if available) are defined under the members tag in each node.

    • Set the server's IP address as the localMemberHost value.
  2. Enable HA processing mode for the Active node, Passive node, Backup node and the Presenter node. To do this, make the following changes in the <CEP_HOME>/repository/conf/event-processor.xml file. These changes will enable the High Availability processing mode and query state persistence in CEP. Moreover, it also allows you to specify the host and ports for the worker nodes for the snapshot server. It also configures the event syncing server to start and the Presenter node configuration specifies its host and port to receive processed events from the worker nodes.

    From CEP 4.1.0 onward, High Availability processing mode will utilize state persistence to span CEP query lifetime much greater than the cluster up-time. This is achieved by taking periodic snapshots and storing all state information and windows in a scalable persistence store. With state persistence enabled, even if the whole cluster crashes, it will still be able to recover and restore to the last state using the last available snapshot.

    The default persistence store used in CEP is org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore (File system-based persistence store). Values for persistenceInterval, SchedulerPoolSize, and persistenceLocation properties can be changed according to the requirements. 

    If you want to persist the state in a database instead of the file system, you can specify org.wso2.carbon.event.processor.core.internal.persistence.DBPersistenceStore as the persistence store. The parameters configured for this are DataSource, TableName, persistenceInterval, and schedulerPoolSize.

    <!-- enable HA config -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="false"/>
            <presenter enable="true"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11224 -->
            <port>11224</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Max total size of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueMaxSizeMb>10</receiverQueueMaxSizeMb>
            <!--Number of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
            <!--Max total size of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueMaxSizeMb>10</publisherQueueMaxSizeMb>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 10005 -->
            <port>10005</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11000 -->
            <port>11000</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
        <persistence enable="true">
            <persistenceIntervalInMinutes>15</persistenceIntervalInMinutes>
            <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
            <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                <property key="persistenceLocation">cep_persistence</property>
            </persister>
        </persistence>
    </mode>
    <!-- disable distributed mode config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    <!-- enable HA config -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="true"/>
            <presenter enable="false"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11224 -->
            <port>11225</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Max total size of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueMaxSizeMb>10</receiverQueueMaxSizeMb>
            <!--Number of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
            <!--Max total size of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueMaxSizeMb>10</publisherQueueMaxSizeMb>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 10005 -->
            <port>10006</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11000 -->
            <port>11001</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
        <persistence enable="true">
            <persistenceIntervalInMinutes>15</persistenceIntervalInMinutes>
            <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
            <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                <property key="persistenceLocation">cep_persistence</property>
            </persister>
        </persistence>
    </mode>
    <!-- disable distributed mode config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    <!-- enable HA config -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="true"/>
            <presenter enable="false"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11224 -->
            <port>11226</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Max total size of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueMaxSizeMb>10</receiverQueueMaxSizeMb>
            <!--Number of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
            <!--Max total size of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueMaxSizeMb>10</publisherQueueMaxSizeMb>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 10005 -->
            <port>10007</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11000 -->
            <port>11002</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
        <persistence enable="true">
            <persistenceIntervalInMinutes>1</persistenceIntervalInMinutes>
            <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
            <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                <property key="persistenceLocation">cep_persistence</property>
            </persister>
        </persistence>
    </mode>
    <!-- disable distributed mode config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    <!-- enable HA config -->
    <mode name="HA" enable="true">
        <nodeType>
            <worker enable="true"/>
            <presenter enable="false"/>
        </nodeType>
        <checkMemberUpdateInterval>10000</checkMemberUpdateInterval>
        <eventSync>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11224 -->
            <port>11227</port>
            <reconnectionInterval>20000</reconnectionInterval>
            <serverThreads>20000</serverThreads>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
            <!--Number of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueSize>1000000</receiverQueueSize>
            <!--Max total size of events that could be queued at receiver before they are synced between CEP/DAS nodes-->
            <receiverQueueMaxSizeMb>10</receiverQueueMaxSizeMb>
            <!--Number of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueSize>1000000</publisherQueueSize>
            <!--Max total size of events that could be queued at publisher to sync output between CEP/DAS nodes-->
            <publisherQueueMaxSizeMb>10</publisherQueueMaxSizeMb>
        </eventSync>
        <management>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 10005 -->
            <port>10008</port>
            <tryStateChangeInterval>15000</tryStateChangeInterval>
            <stateSyncRetryInterval>10000</stateSyncRetryInterval>
        </management>
        <presentation>
            <hostName>host-ip-adress</hostName>
            <!-- If the server started with a port offset, it should be added to 11000 -->
            <port>11003</port>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <publisherTcpSendBufferSize>5242880</publisherTcpSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <publisherCharSet>UTF-8</publisherCharSet>
            <publisherBufferSize>1024</publisherBufferSize>
            <publisherConnectionStatusCheckInterval>30000</publisherConnectionStatusCheckInterval>
        </presentation>
        <persistence enable="true">
            <persistenceIntervalInMinutes>1</persistenceIntervalInMinutes>
            <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
            <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                <property key="persistenceLocation">cep_persistence</property>
            </persister>
        </persistence>
    </mode>
    <!-- disable distributed mode config -->
    <mode name="Distributed" enable="false">
        ...
    </mode>
    • You only enable the high availability mode configurations here and disable all the other modes.
    • You must configure the host name and port according to the settings in your server. The ports specified for each node in the <CEP_HOME>/repository/conf/event-processor.xml file are not automatically updated by the port offset.
  3. Share the registry databases. You may need to set up the databases first by following the instructions in the Setting up the Database topic. However, the following datasource configurations are done for the default H2 database and using a SQL database as the shared registry and user management in <CEP_HOME>/repository/conf/datasources/master-datasource.xml in each node.

    <datasources-configuration xmlns:svns="http://org.wso2.securevault/configuration">
        ...
        <datasources>
            ...
            <datasource>
                <name>WSO2_CARBON_DB</name>
                <description>The datasource used for registry and user manager</description>
                <jndiConfig>
                    <name>jdbc/WSO2CarbonDB</name>
                </jndiConfig>
                <definition type="RDBMS">
                    <configuration>
                        <url>jdbc:h2:repository/database/WSO2CARBON_DB;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=60000</url>
                        <username>wso2carbon</username>
                        <password>wso2carbon</password>
                        <driverClassName>org.h2.Driver</driverClassName>
                        <maxActive>50</maxActive>
                        <maxWait>60000</maxWait>
                        <testOnBorrow>true</testOnBorrow>
                        <validationQuery>SELECT 1</validationQuery>
                        <validationInterval>30000</validationInterval>
                        <defaultAutoCommit>false</defaultAutoCommit>
                    </configuration>
                </definition>
            </datasource>
            <datasource>
                <name>WSO2_SHARED_REG</name>
                <description>The datasource used for registry</description>
                <jndiConfig>
                    <name>jdbc/WSO2SharedReg</name>
                </jndiConfig>
                <definition type="RDBMS">
                    <configuration>
                        <url>jdbc:mysql://localhost:3306/WSO2_SHARED_REG</url>
                        <username>root</username>
                        <password>root</password>
                        <driverClassName>com.mysql.jdbc.Driver</driverClassName>
                        <maxActive>50</maxActive>
                        <maxWait>60000</maxWait>
                        <testOnBorrow>true</testOnBorrow>
                        <validationQuery>SELECT 1</validationQuery>
                        <validationInterval>30000</validationInterval>
                        <defaultAutoCommit>false</defaultAutoCommit>
                    </configuration>
                </definition>
            </datasource>
            <datasource> 
                <name>WSO2_USER_DB</name> 
                <description>The datasource used for user management</description> 
                <jndiConfig> 
                    <name>jdbc/WSO2UMDB</name> 
                </jndiConfig> 
                <definition type="RDBMS"> 
                    <configuration> 
                        <url>jdbc:mysql://localhost:3306/WSO2_USER_DB</url> 
                        <username>root</username> 
                        <password>root</password> 
                        <driverClassName>com.mysql.jdbc.Driver</driverClassName> 
                        <maxActive>50</maxActive> 
                        <maxWait>60000</maxWait> 
                        <testOnBorrow>true</testOnBorrow> 
                        <validationQuery>SELECT 1</validationQuery> 
                        <validationInterval>30000</validationInterval> 
                    </configuration> 
                </definition> 
            </datasource>
            ...
        </datasources>
        ...
    </datasources-configuration>

    The following configurations should be added to the  <CEP_HOME>/repository/conf/registry.xml file in every node as well

    <wso2registry>
        ...
        <dbConfig name="remote_registry"> 
            <dataSource>jdbc/WSO2SharedReg</dataSource> 
        </dbConfig>
        <remoteInstance url="https://localhost:9443/registry"> 
          	<id>instanceid</id>
          	<dbConfig>remote_registry</dbConfig>
          	<cacheId>root@jdbc:mysql://localhost:3306/CEP_DB</cacheId>
          	<readOnly>false</readOnly>
          	<enableCache>true</enableCache>
          	<registryRoot>/</registryRoot>
        </remoteInstance>
        <mount path="/_system/config" overwrite="true">
          	<instanceId>instanceid</instanceId>
          	<targetPath>/_system/nodes</targetPath>
        </mount>
        <mount path="/_system/governance" overwrite="true">
          	<instanceId>instanceid</instanceId>
          	<targetPath>/_system/governance</targetPath>
        </mount>
    	...
    </wso2registry>

    To configure the datasource, update the dataSource property found in <PRODUCT_HOME>/repository/conf/user-mgt.xml of the nodes as shown below:

    <Property name="dataSource">jdbc/WSO2UMDB</Property>

    Note: Since this uses MySQL database, Please refer to the setting up MySQL topic.

  4. Enable SVN synchroniser for all three nodes (active, passive and backup nodes). (to make sure both nodes have exactly same artefacts, both nodes failing to have same artefacts deployed will lead to unpredictable results).

    Note: When enabling Deployment Synchronization in the CEP in HA mode, since both nodes are manager nodes, make sure only one CEP node has a configuration with auto commit to 'true'

Distributed mode deployment

Pros: Supports scalability with Apache Storm.

Cons: The current deployment does not support HA. 

The following image provides a brief overview of how Distributed CEP works. 

The CEP Event Receivers component receives events from an external source and converts it into a format that the Siddhi engine can understand. This component then sends these events to the Event Streams component. The Event Stream component acts as the central hub for all event and it handles all event streams in the system. The Event Processor handles actual event processing and is the core event processing unit of the CEP. It manages different execution plans and processes events based on logic, with the help of different Siddhi engines available in a Storm topology. The Event Processor receives a set of event streams from the Event Stream component, processes them using the Siddhi engine in Storm, and triggers new events on different event streams back to the Event Streams component. The CEP Event Publisher receives processed events from the Event Streams component and sends them to the external event sink in the configured form. The Storm topology runs Siddhi and has receivers and publishers to communicate with CEP receivers and publishers.

This section explains how to configure WSO2 CEP to run with Apache Storm in the distributed mode. There are four main components that are involved when running CEP with Storm.

  • CEP Workers: Receives events from an external source and converts it into a format that the Siddhi engine can understand and sends these events to the Storm, and receives processed events from the Storm engine and sends them to the external event sink in the configured form.
  • Siddhi Storm Topology: The Storm topology runs Siddhi Core in a distributed manner. The topology itself has receivers and publishers to communicate with CEP.
  • CEP Manager Service: Deploys the topology and keeps track of CEP/Storm Receivers/Publishers and enables those components to discover communicate between each other.
  • CEP Presenters: Nodes which are designated to the aggregate events streams in multiple nodes to be presented through "   Analytics Dashboards ". Dashboards should be created in these nodes.

The following image depicts the high level architecture for the distributed mode.

Related links

For more information on the Apache Storm topology, see the following links.

It's required to setup a apache storm cluster in addition to following the below steps. Please refer Storm documentation for detailed information on how to setup a storm cluster.

Note: Distributed setup was tested on Apache Storm 0.9.5

Configuring the event-processor.xml file

The event-processor.xml file contains configurations of all three CEP event processing modes. To run with Apache Storm, make sure to disable HA processing mode. Distributed mode contains configurations of both CEP Receivers/Publishers and Storm Receivers/Publishers. This file is located in the <CEP_HOME>/repository/conf directory.  Following are the contents of the file, if you are to use distributed mode of each type of node.


<eventProcessorConfiguration>
    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="false"/>
            <manager enable="true">
                <hostName>172.17.42.1</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="false">
                <hostName>localhost</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>
<eventProcessorConfiguration>
	
    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="true"/>
            <manager enable="false">
                <hostName>localhost</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="false">
                <hostName>localhost</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>
<eventProcessorConfiguration>

    <mode name="HA" enable="false">
       ..
    </mode>

	<mode name="Distributed" enable="true">
        <nodeType>
            <worker enable="false"/>
            <manager enable="false">
                <hostName>localhost</hostName>
                <port>8904</port>
            </manager>
            <presenter enable="true">
                <hostName>172.17.42.3</hostName>
                <port>11000</port>
            </presenter>
        </nodeType>
        <management>
            <managers>
                <manager>
                    <hostName>172.17.42.1</hostName>
                    <port>8904</port>
                </manager>
                <manager>
                    <hostName>172.17.42.2</hostName>
                    <port>8904</port>
                </manager>
            </managers>
            <!--Connection re-try interval to connect to Storm Manager service in case of a connection failure-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Heart beat interval (in ms) for event listeners in "Storm Receivers" and "CEP Publishers" to acknowledge their
            availability for receiving events"-->
            <heartbeatInterval>5000</heartbeatInterval>
            <!--Storm topology re-submit interval in case of a topology submission failure-->
            <topologyResubmitInterval>10000</topologyResubmitInterval>
        </management>

		<transport>
            <!--Port range to be used for events listener servers in "Storm Receiver Spouts" and "CEP Publishers"-->
            <portRange>
                <min>15000</min>
                <max>15100</max>
            </portRange>
            <!--Connection re-try interval (in ms) for connection failures between "CEP Receiver" to "Storm Receiver" connections
            and "Storm Publisher" to "CEP Publisher" connections-->
            <reconnectionInterval>20000</reconnectionInterval>
            <!--Size of the output queue of each "CEP Receiver" which stores events to be published into "Storm Receivers" .
            This must be a power of two-->
            <cepReceiverOutputQueueSize>8192</cepReceiverOutputQueueSize>
            <!--Size of the output queue of each "Storm Publisher" which stores events to be published into "CEP Publisher" .
           This must be a power of two-->
            <stormPublisherOutputQueueSize>8192</stormPublisherOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </transport>
        <presentation>
            <presentationOutputQueueSize>1024</presentationOutputQueueSize>
            <!--Size of TCP event publishing client's send buffer in bytes-->
            <tcpEventPublisherSendBufferSize>5242880</tcpEventPublisherSendBufferSize>
            <!--Character encoding of TCP event publishing client-->
            <tcpEventPublisherCharSet>UTF-8</tcpEventPublisherCharSet>
            <!--Number of threads to be used by event receiving servers in "Storm Receivers" and "CEP Publishers"-->
            <tcpEventReceiverThreadCount>10</tcpEventReceiverThreadCount>
            <connectionStatusCheckInterval>20000</connectionStatusCheckInterval>
        </presentation>
        <statusMonitor>
            <lockTimeout>60000</lockTimeout>
            <updateRate>60000</updateRate>
        </statusMonitor>
        <stormJar>org.wso2.cep.storm.dependencies.jar</stormJar>
        <distributedUIUrl></distributedUIUrl>
        <memberUpdateCheckInterval>2000</memberUpdateCheckInterval>
    </mode>
 
</eventProcessorConfiguration>

Keep the following in mind when configuring this file.

  • You must only enable the distributed mode configurations here and ensure that all other modes are set to false. 
  • A single node can act as Manager, worker and presenter at the same time.

  • You can start the node as a CEP manager only by setting the manager element to true and worker and presenter elements to false. Similarly setting enable true in woker/ presenter elements will make the node act as a worker/presenter. If all are disabled, the server starts as a normal CEP node.

  • When starting CEP as a manager node,  hostName/IP and port for the storm manager service should be specified. Similarly, when starting CEP as a preseneter node hostName/IP and port should be specified for event synching service to be hosted.

    <nodeType>
       <worker enable="true"/>
       <manager enable="true">
             <hostName>172.17.42.1</hostName>
             <port>8904</port>
       </manager>
       <presenter enable="true">
             <hostName>172.17.42.1</hostName>
             <port>11000</port>
       </presenter>
     </nodeType>
  • Specify the portRange to list out the range of ports that can be opened to receive events in both CEP and Storm when communicating with the Storm topology. This can be any range of ports.

    <portRange>
          <min>15000</min>
          <max>15100</max>
    </portRange>
  • Specify CEP managers by listing their hostName and ports.

    <managers>
          <manager>
              <hostName>172.17.42.1</hostName>
              <port>8904</port>
          </manager>
          <manager>
              <hostName>172.17.42.2</hostName>
              <port>8904</port>
          </manager>
    </managers>

Enabling clustering

Enable clustering in all managers, workers, and presenters. It is necessary for all node to belong to the same cluster domain under the wka membership scheme.

  1. Open the <CEP_HOME>/repository/conf/axis2/axis2.xml file and scroll down to the 'Clustering' section.
  2. Set the 'enable' attribute of the <clustering> element to true. This enables clustering for that node.
    <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
  3. Change the 'membershipScheme' parameter to 'wka'. For more information on membership schemes and which to choose, read the section about membership schemes.
    <parameter name="membershipScheme">wka</parameter>
  4. Provide a domain for the cluster. The domain, which is ‘wso2.carbon.domain’ in this particular example configuration, must be the same for all the managers and workers. All the nodes having the domain as ‘wso2.carbon.domain’ will be part of the same cluster.
    <parameter name="domain">wso2.carbon.domain</parameter>
  5. Specify the 'localMemberHost' and 'localMemberPort' parameters. The localMemberHost is the IP address of the CEP instance you’re configuring. The localMemberPort is the port on which this CEP instance will be listening for incoming cluster messages. If you are running multiple CEP instances on the same host, make sure that this port value does not conflict with the localMemberPort value of the other CEP instances.

    <parameter name="localMemberHost">172.17.42.1</parameter>
    <parameter name="localMemberPort">4000</parameter>
  6. Specify well-known members for the cluster. It is necessary to have at least two well-known address (WKA) members in order to work correctly and to recover if a single WKA member fails. All the managers and workers qualify to be a well-known member. For more information on well-known members, read the section about membership schemes. The following is an example where the cluster has two members as well-known members. The port that we provide here should be equal to the 'localMemberPort' of the well-known member.

    <members>
    	<member>
    		<hostName>172.17.42.1</hostName>
    		<port>4000</port>
    	</member>
    	<member>
    		<hostName>172.17.42.2</hostName>
    		<port>4000</port>
    	</member>
    	<member>
    		<hostName>172.17.42.3</hostName>
    		<port>4000</port>
    	</member>
     </members>
  7. Save and close the file and restart the servers (if running) for the changes to take effect.

Configure storm.yaml

This file is located in <CEP_HOME>/repository/conf/cep/storm. Configurations related to the storm topology has to be specified in this file. Importantly when the Storm cluster is not residing in the same machine as CEP manager node the IP address of the Storm Nimbus must be specified in this file for each CEP manager as follows,

nimbus.host: "10.100.5.42"

Other configurations related to storm topologies can be specified here (e.g. topology.workers: 2). Configurations specified in this file will be applied to all Storm topologies submitted to the Storm cluster by CEP manager. Please refer defaults.yaml for more information.

Share the registry database

Create the database in a DB server and point to it from all nodes. Scripts in <CEP_HOME>/dbscripts can be used to create the database. Please refer Setting up the Database for detailed information on how to create the DB.

After creating the DB change the WSO2_CARBON_DB datasource as follows in <CEP_HOME>/repository/conf/datasources/master-datasource.xml in all CEP managers, CEP workers and CEP presenters.

<datasource>
     <name>WSO2_CARBON_DB</name>
     <description>The datasource used for registry and user manager</description>
     <jndiConfig>
        <name>jdbc/WSO2CarbonDB</name>
     </jndiConfig>
     <definition type="RDBMS">
        <configuration>
           <url>jdbc:mysql://<mysql-server-ip>:3306/<database-name></url>
           <username>root</username>
           <password>root</password>
           <driverClassName>com.mysql.jdbc.Driver</driverClassName>
           <maxActive>50</maxActive>
           <maxWait>60000</maxWait>
           <testOnBorrow>true</testOnBorrow>
           <validationQuery>SELECT 1</validationQuery>
           <validationInterval>30000</validationInterval>
        </configuration>
     </definition>
</datasource>

For a single node deployment, the default H2 database can be used. However, it's not recommended to use H2 DB for a production environment.

Setup deployment synchronizer

Setup deployment synchronizer for synchronizing artifacts across the cluster. CEP managers must be configured as manager nodes and CEP worker and CEP presenter nodes must be configured as worker nodes. Please refer to the SVN-Based Deployment Synchronizer documentation for more information.

Please note that artifacts must be deployed only through CEP manager nodes.

Running the CEP server

Ensure that you have disabled HA mode in the event-processor.xml file so that the CEP server starts in distributed mode and attempts to connect to the receivers and publisher of the Storm topology. Also, make sure that the Apache storm cluster is running. Then Start up the CEP servers.

  • No labels