||
Skip to end of metadata
Go to start of metadata

WSO2 Stream Processor (SP) is a lightweight and lean, streaming SQL based stream processing platform that allows you to collect events, analyze them in real-time, identify patterns, map their impacts, and communicate the results within milliseconds.  It is powered by Siddhi to be extremely high performing.

First, let's understand the following concepts that are used in this guide:

Stream Processing and Complex Event Processing Overview

Let's understand what an event is through an example. If we consider the transactions carried out via an ATM as a data stream, one withdrawal from it can be considered an event. This event contains data about the amount, time, account number etc. Many such transactions form a stream.

Stream processing engines allow you to create a processing graph and inject events into it. Each operator processes and sends events to next processor.

A complex event is an event that summarizes, represents or denotes a set of other events. Complex Event Processing is a subset of Stream Processing which involves analyzing multiple streams of events in real time, recognizing particular sequences or patterns across streams and inferring a business significant event from correlated events.

The stream processing capabilities of WSO2 SP allow you to capture high volume data flows and process them in real time, and present results in a streaming manner while its complex event processing capabilities detect patterns and trends for decision making via Patterns and Sequences supported for Siddhi.

Siddhi overview

WSO2 SP uses the Siddhi query language to write the processing logic for its Siddhi applications. Siddhi can:

  • Accept event inputs from many different types of sources

  • Process them to generate insights

  • Publish them to many types of sinks.

To use Siddhi, you need to write the processing logic as a Siddhi Application in the Siddhi Streaming SQL language. After writing and starting a Siddhi application, it:

  • Takes data one-by-one as events

  • Processes the data in each event

  • Generates new high level events based on the processing done so far

  • Sends newly generated events as the output to streams.


Before you begin:

  1. Install Oracle Java SE Development Kit (JDK) version 1.8.
  2. Set the JAVA_HOME environment variable.
  3. Download the latest WSO2 Stream Processor.
  4. Extract the downloaded zip and navigate to the <SP_HOME>/bin directory (<SP_HOME> is the extracted directory).
  5. Issue one of the following commands to start the WSO2 Stream Processor Studio.
    • For Windows: editor.bat
    • For Linux: ./editor.sh

Once WSO2 SP server is successfully started, a log similar to the following is printed in the CLI.

The server log prints the Stream Processor Studio URL in the start up logs as shown below.

Scenario

In this scenario, you are creating an application for  Shipping Wave, a fictitious large scale shipping company.  Smith, the cargo manager needs to keep track of the total weight of cargo loaded to a ship at any given time. Measuring the weight of a cargo box when it is loaded to the ship is considered an event.


Let's get started! You can write a simple Siddhi application to calculate the total weight with each cargo box loaded to the ship by following the steps below.

Step 1: Create a Siddhi application


Smith needs to calculate the total weight of the cargo loaded into a ship with every cargo box added. In order to generate this output for him, let's create a Siddhi application as follows:
  1. Access the Stream Processor Studio via the http://<HOST_NAME>:<EDITOR_PORT>/editor URL. 

    The default URL is http://localhost:9390/editor

    The Stream Processor Studio opens as shown below.

  2. Enter a name for your Siddhi application. In this scenario, let's name the application CargoWeightApp as shown below.

    @App:name("CargoWeightApp")
  3. Defining the input stream. The stream needs to have a name and a schema defining the data that each incoming event should contain. The event data attributes are expressed as name and type pairs. In this example:

    • The name of the input stream: CargoStream

    • A name to refer to the data in each event: weight

    • Type of the data received as weight: int


    define stream CargoStream (weight int);
  4. Define an output stream. This has the same info as the previous definition with an additional totalWeight attribute that contains the total weight calculated so far. Here, we need to add a sink configuration to log the events from the OutputStream so that we can observe the output values.

    A sink specifies the method of publishing streams to external systems via Siddhi. In this scenario, the sink added is of the log type, and it publishes output streams as logs in the CLI.

    @sink(type='log', prefix='LOGGER')
    define stream OutputStream(weight int, totalWeight long);
  5. Enter a Siddhi query that defines the following.
    • A name for the query (i.e., cargoWeightQuery)
    • The input stream from which the events to be processed are taken (i.e., CargoStream)
    • The data that needs to be sent to the output stream (i.e., weight and totalWeight)
    • How the output needs to be calculated (i.e., by calculating the sum of the weight of all the events)
    • The stream to which the output needs to be sent (i.e., OutputStream)


    This query is as follows:

    @info(name='CargoWeightQuery')
    from CargoStream
    select weight, sum(weight) as totalWeight
    insert into OutputStream;

    The completed Siddhi file is as follows:

    @App:name("CargoWeightApp")
    
    define stream CargoStream (weight int);
    
    @sink(type='log', prefix='LOGGER')
    define stream OutputStream(weight int, totalWeight long);
    
    @info(name='CargoWeightQuery')
    from CargoStream
    select weight, sum(weight) as totalWeight
    insert into OutputStream;
  6. To save the Siddhi file, click File => Save. This opens the Save to Workspace dialog box. Click Save to save this file in the <SP_HOME>/wso2/editor/deployment/workspace directory (which is the default location where Siddhi applications are saved).

Step 2: Simulate events

The Stream Processor Studio has in-built support to simulate events. To test whether the CargoWeightApp you created works as expected, let's simulate some events by following the steps given below.

  1. To start the CargoWeight Siddhi application, click the play button.

    If the application is successfully started, the following is logged in the Stream Processor Studio console.
  2. Click the following icon in the Stream Processor Studio to open the event simulation panel.
  3. In the Single Simulation tab of the panel, select values as follows:

    FieldValue
    Siddhi App NameCargoWeight
    Stream NameCargoStream

    As a result, the Weight atrribute of the CargoStream stream is displayed as follows: 

  4. In the weight field, enter 1000 and click Start to start the Siddhi application. Then click Send to send the event. The event is logged in the CLI as shown below:

  5. Send five more events with the following values.

    Event NoWeight
    12000
    21500
    32000
    43000
    51000

    The events are logged as follows.

    Each new weight is added to the new weight. Therefore, after all six events are sent, the total weight is 10500

Step 3: Edit Siddhi application to perform temporal processing

This section demonstrates how to carry out temporal window processing with Siddhi.

In the previous scenario, you carried out processing by having only the running sum value in-memory. No events were stored during this process.

Window processing is a method that allows us to store some events in-memory for a given period or a specific number of events so that we can perform operations such as calculating the average, maximum, etc values within them

Let's consider that Smith, the cargo manager has an additional requirement to calculate the average for the last three cargo boxes loaded each time a new cargo box is loaded in order to balance the weight across the ship. Here, we are considering a window that consists of three events as shown in the image below.

To achieve this, edit the Siddhi application by following the steps below:

  1. Add a new attribute named averageWeight to the definition of the OutputStream stream so that each output event presents the average weight in addition to the weight of the new box loaded and the total weight.

    define stream OutputStream(weight int, totalWeight long, averageWeight double);
  2. To specify how to calculate the average weight, apply the avg Siddhi function to the weight attribute in the select statement as shown below. This indicates that the average is calulated for the weight attribute of incoming events.

    select weight, sum(weight) as totalWeight, avg(weight) as averageWeight
  3. To specify that the calculations performed by this query with each event must be applied only to the last three events received, apply a length window to the input stream as shown below.

    from CargoStream#window.length(3)

    This window applies to all the calculations performed for the events taken from the CargoStream stream. Therefore, adding this window also results in the total weight being calculated based on the last three events.

The completed query is as follows.

@info(name='CargoWeightQuery') 
from CargoStream#window.length(3)
select weight, sum(weight) as totalWeight, avg(weight) as averageWeight
insert into OutputStream;

The complete CargoWeight Siddhi application is as follows.

@App:name("CargoWeight")

define stream CargoStream (weight int);

@sink(type='log', prefix='LOGGER')
define stream OutputStream(weight int, totalWeight long, averageWeight double);

@info(name='CargoWeightQuery') 
from CargoStream#window.length(3)
select weight, sum(weight) as totalWeight, avg(weight) as averageWeight
insert into OutputStream;

Step 4: Simulate events for the edited Siddhi application

In this step, let's start the edited Siddhi application and simulate the same six events that you simulated in Step 8.

Event NoWeight
11000
22000
31500
42000
53000
61000

The output generated is logged as shown below.
 

  • No labels

1 Comment

  1. from SmartHomeData#window.timeBatch(10 sec)select houseId as houseId, max(value) as maxVal, min(value) as minVal, avg(value) as avgValgroup by houseIdinsert into UsageStream;
    isn't it better to add @query annotation ?