WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to persist WSO2 CEP states over server restart when Single Node is used. This uses the following window query to demonstrate how you can execute the previous state of a query, even after a server stops/crashes. The custom text output events are published using a logger event publisher. Custom events are events with custom mappings that does not adhere to the default event formats. For more information on event formats, see Event Formats

from loginEvents#window.length(50)
select count(ipAddress)  as ipCount, sum(frequency) as totalCount
insert into loginCount;

Above query calculates the count of the IP addresses, and the sum of the frequencies of the last 50 events arrived.

Prerequisites

Follow the steps below to set up the prerequisites for this sample.

  1. Set up the prerequisites required for all samples.
  2. Set up a node (ignore the presenter node configurations for this sample) as mentioned in the configuring High Availability in CEP nodes sectionSummarised steps are as follows.

    1. Do the following changes in the <CEP_HOME>/repository/conf/axis2/axis2.xml file, to enable clustering.

      <axisconfig name="AxisJava2.0">
          ...
          <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
              ...     
              <parameter name="membershipScheme">wka</parameter>
              ...
              <!-- The host name or IP address of this member other then localhost/127.0.0.1 -->
              <parameter name="localMemberHost">127.0.0.1</parameter>
              ...
              <members>
                  <member>
                      <hostName>127.0.0.1</hostName>
                      <port>4000</port>
                  </member>
              </members>
          ...
          </clustering>
      </axisconfig>
    2. Do the following changes In the <CEP_HOME>/repository/conf/event-processor.xml file, to enable HA mode. Make sure that <persistence/> is enabled. 

      <eventProcessorConfiguration>
      
          <!-- HA Mode Config -->
          <mode name="HA" enable="true">
              <nodeType>
                  <worker enable="true"/>
                  <presenter enable="true"/>
              </nodeType>
              ...
              ...
              <persistence enable="true">
                  <persistenceIntervalInMinutes>1</persistenceIntervalInMinutes>
                  <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize>
                  <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore">
                      <property key="persistenceLocation">cep_persistence</property>
                  </persister>
              </persistence>
          </mode>
      
          <!-- Distributed Mode Config -->
          <mode name="Distributed" enable="false">
              ...
          </mode>
      
      </eventProcessorConfiguration>

      With above changes, CEP will take snapshots every one minute, and stores them in the <CEP_HOME>/cep_persistence/ directory. Moreover, it uses FileSystemPersistenceStore, which is the default persistence store of WSO2 CEP is used to persist the states. 

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0502. For instructions, see Starting sample CEP configurations.

This sample configuration creates the following.

  • Two streams with the IDs org.wso2.sample.login.info:1.0.0 and org.wso2.sample.login.count:1.0.0 

  • An event receiver named loginInfoReceiver

  • An event publisher named loginCountPublisher

  • An execution plan named LoginCountExecutionPlan

Executing the sample

Follow the steps below to execute the sample.

  1. Navigate to the <CEP_HOME>/samples/cep/producers/http/ directory, and execute the following Ant command using another tab in the CLI: ant -Durl=http://localhost:9763/endpoints/loginInfoReceiver -Dsn=0502

    This builds the HTTP client, and publishes the events defined in the <CEP_HOME>/samples/cep/artifacts/0502/loginInfoReceiver.txt file to the loginInfoReceiver endpoint.

    Do not close this terminal. It is required to keep the server running and receiving events.

    You view the CEP server receiving the output events in the logs of it in the CLI as shown below. 

    In the above output of events, the ipCount is incremented by one for each new event, and the totalCount is increased by the frequency of each event.

  2. Wait for CEP server to take a snapshot and persist.
  3. Restart the server and repeat step 1.

    You view the CEP server receiving the output events in the logs of it in the CLI as shown below.


    Event processing continues from the previous state even after you restart the server. The ipCount of the first event is displayed as 7 because you already sent 6 events in step 1. The totalCount is also calculated by considering the previous events as well.

  • No labels