The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how to set up an execution plan with a sequence-based query that can be used to detect trends from a stock trades stream. This sample uses the Event Simulator for inputs and the logger publisher for logging the outputs to the DAS console.

The query used in this sample is as follows:

from every e1=FilteredStockStream[price>20], 
	e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)]+, 
	e3=FilteredStockStream[price<e2[last].price] 
select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak 
insert into PeakStream ;

Above query:

  • Processes the events received through the FilteredStockStream.
  • First it looks for an event e1 with the condition where the price is greater than 20.
  • Then it looks for one or more events, e2 with a condition:
    • ((e2[last].price is null) and price>=e1.price) is used to check the first event after e1, i.e. (e2[last].price is null) returns true since there is no last event in e2. Then it checks for the condition where the current event price is greater than e1 price.

    • ((not (e2[last].price is null)) and price>=e2[last].price) this part is for any subsequent events. In this case the last of e2 is not null, and we check whether the price of the current event is greater than the last event. i.e. we are looking for one or more events with a continuous price increase from this whole condition.

  • Then we look for another event with a price drop from the last e2 event from the condition e3=FilteredStockStream[price<e2[last].price].
  • From the select clause we select the attributes e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak.
  • Finally the event is output to the PeakStream.

Prerequisites

Set up the prerequisites required for all samples.

Building the sample

Start the WSO2 DAS server with the sample configuration numbered 0109. For instructions, see Starting sample CEP configurations.

This sample configuration points the default Axis2 repo to <DAS_HOME>/samples/cep/artifacts/0109 (by default, the Axis2 repo is <DAS_HOME>/repository/deployment/server).

Executing the sample

  1. Log into the DAS Management Console.

  2. Go to Tools -> Event Simulator. Under the Multiple Events section, the events.csv file is listed. This file contains some sample data. Click Play to start sending sample events from the file.
  3. View the output events received from the DAS console. This sample uses the logger adaptor to log output events to the console.

    The following is an example screenshot of the output of the consumer sending events from the producer:

  • No labels