The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to set up an execution plan with a pattern matching Siddhi query to detect non-occurrences. In this sample we use patterns to detect non-delivered packages of a courier service within a predefined time period (for demonstration purposes we assume the time limit to deliver a package is 2 minutes).

The queries used in this sample are as follows. 

from arrivals_stream#window.time(2 minutes)
select *
insert expired events into overdue_deliveries_stream;


from every arrivalEvent = arrivals_stream ->
deliveryEvent = deliveries_stream[arrivalEvent.trackingId == trackingId] 
    or overdue_delivery = overdue_deliveries_stream[arrivalEvent.trackingId == trackingId]
select arrivalEvent.trackingId as trackingId, arrivalEvent.customerName as customerName, arrivalEvent.telephoneNo as telephoneNo, deliveryEvent.trackingId as deliveryId 
insert into filter_stream;


from filter_stream [ (deliveryId is null)]  
select trackingId, customerName, telephoneNo
insert into alert_stream;  

 

The first query keeps each arrival event for 2 minutes and then publishes it to the overdue_deliveries_stream event stream. This event stream is then used in the second query.

The second query uses a pattern. It looks for a pattern where an arrival event (from the arrivals stream event stream) is always followed by a delivery event (from the deliveries_stream event stream), or by an overdue delivery event (i.e., an arrival event that is 2 minutes old from the overdue_deliveries_stream event stream). If the pattern detects an arrival event followed by an overdue delivery event with no delivery event, that means the package is not delivered within the 2 minute time limit. 

The third query is used to detect whether the delivery event is null where the deliveryId is null if the delivery event is null.

Prerequisites

For general prerequisites, see CEP Samples Setup. (The apache-axiom.jar needs to be copied to the <DAS_HOME>/samples/cep/lib directory in order to send XML event types.)

Building the sample

Start the WSO2 DAS server with the sample configuration numbered 0111. For instructions, see Starting sample CEP configurations.

This sample configuration points the default Axis2 repo to <DAS_HOME>/samples/cep/artifacts/0111 (by default, the Axis2 repo is <DAS_HOME>/repository/deployment/server).

Executing the sample

  1. Open a new terminal, go to <DAS_HOME>/samples/cep/consumers/generic-log-service and run the command below. It builds the sample log service web app and deploys in the webapps repository that is relevant to the sample.

    ant -Dsn=0111
  2. View the logs in the DAS server when logger service is deploying. For example, after deployment, the Web app is able to receive messages sent from the DAS server. 

  3. Navigate to <DAS_HOME>/samples/cep/producers/http and execute the following command.

    ant -Durl=http://localhost:9763/endpoints/packageArrivalsHTTPReceiver -DfilePath=../../artifacts/0111/arrivalEvents.txt


    This reads the arrivalEvents.txt file that contains a few sample arrival events and then sends them to the specified URL.

  4. Execute the following command:

    ant -Durl=http://localhost:9763/endpoints/packageDeliveryHTTPReceiver -DfilePath=../../artifacts/0111/deliveryEvents.txt

    This reads the deliveryEvents.txt file that contains some sample delivery events and sends them to the specified URL.

    The output is logged in the console as shown below after 2 minutes.

  • No labels