Introduction
This sample demonstrates how to persist WSO2 CEP states over server restart when Single Node is used. This uses the following window query to demonstrate how you can execute the previous state of a query, even after a server stops/crashes. The custom text output events are published using a logger event publisher. Custom events are events with custom mappings that does not adhere to the default event formats. For more information on event formats, see Event Formats.
from loginEvents#window.length(50) select count(ipAddress) as ipCount, sum(frequency) as totalCount insert into loginCount;
Above query calculates the count of the IP addresses, and the sum of the frequencies of the last 50 events arrived.
Prerequisites
Follow the steps below to set up the prerequisites for this sample.
- Set up the prerequisites required for all samples.
Set up a node (ignore the presenter node configurations for this sample) as mentioned in the configuring High Availability in CEP nodes section. Summarised steps are as follows.
Do the following changes in the
<CEP_HOME>/repository/conf/axis2/axis2.xml
file, to enable clustering.<axisconfig name="AxisJava2.0"> ... <clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true"> ... <parameter name="membershipScheme">wka</parameter> ... <!-- The host name or IP address of this member other then localhost/127.0.0.1 --> <parameter name="localMemberHost">127.0.0.1</parameter> ... <members> <member> <hostName>127.0.0.1</hostName> <port>4000</port> </member> </members> ... </clustering> </axisconfig>
Do the following changes In the
<CEP_HOME>/repository/conf/event-processor.xml
file, to enable HA mode. Make sure that <persistence/> is enabled.<eventProcessorConfiguration> <!-- HA Mode Config --> <mode name="HA" enable="true"> <nodeType> <worker enable="true"/> <presenter enable="true"/> </nodeType> ... ... <persistence enable="true"> <persistenceIntervalInMinutes>1</persistenceIntervalInMinutes> <persisterSchedulerPoolSize>10</persisterSchedulerPoolSize> <persister class="org.wso2.carbon.event.processor.core.internal.persistence.FileSystemPersistenceStore"> <property key="persistenceLocation">cep_persistence</property> </persister> </persistence> </mode> <!-- Distributed Mode Config --> <mode name="Distributed" enable="false"> ... </mode> </eventProcessorConfiguration>
With above changes, CEP will take snapshots every one minute, and stores them in the
<CEP_HOME>/cep_persistence/
directory. Moreover, it usesFileSystemPersistenceStore
, which is the default persistence store of WSO2 CEP is used to persist the states.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0502. For instructions, see Starting sample CEP configurations.
This sample configuration creates the following.
Two streams with the IDs
org.wso2.sample.login.info:1.0.0
andorg.wso2.sample.login.count:1.0.0
An event receiver named
loginInfoReceiver
An event publisher named
loginCountPublisher
- An execution plan named
LoginCountExecutionPlan
Executing the sample
Follow the steps below to execute the sample.
- Navigate to the
<CEP_HOME>/samples/cep/producers/http/
directory, and execute the following Ant command using another tab in the CLI:ant -Durl=http://localhost:9763/endpoints/loginInfoReceiver -Dsn=0502
This builds the HTTP client, and publishes the events defined in the
<CEP_HOME>/samples/cep/artifacts/0502/loginInfoReceiver.txt
file to theloginInfoReceiver
endpoint.Do not close this terminal. It is required to keep the server running and receiving events.
You view the CEP server receiving the output events in the logs of it in the CLI as shown below.
In the above output of events, the
ipCount
is incremented by one for each new event, and thetotalCount
is increased by the frequency of each event. - Wait for CEP server to take a snapshot and persist.
Restart the server and repeat step 1.
You view the CEP server receiving the output events in the logs of it in the CLI as shown below.
Event processing continues from the previous state even after you restart the server. The
ipCount
of the first event is displayed as 7 because you already sent 6 events in step 1. ThetotalCount
is also calculated by considering the previous events as well.