This documentation is for WSO2 Stream Processor 4.4.0 (the latest version of WSO2 SP. View documentation for the Streaming Integrator, the successor of WSO2 SP.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

A Key Performance Indicator (KPI) is a quantifiable metric that can be used to evaluate the success or the failure of the object being measured. WSO2 SP with its real-time stream processing engine allows you perform KPI Analysis. In this tutorial, let's look at how to create a Siddhi application that performs KPI analysis.

In this scenario, the foreman wants the production per hour of each sweet to not fall below 5000 units in order to meet the customer demand. At the same time, he also needs to know if the production per hour of a category is over 10000 in order to identify surpluses. Let's see how to write a query to derive this information by analyzing data sent in JSON format by the Sweet Bots on the factory floor.

This tutorial covers the following concepts:

  • Grouping and filtering

  • Siddhi functions

  • Conditional processing using ifThenElse()

Before you begin:

This tutorial reuses the following queries and configurations from other tutorials:


Tutorial steps

In this tutorial, there are two scenarios as follows:

  • Detecting sweets of which the production per hour falls below 5000
  • Detecting sweets of which the production per hour rises above 10000

Scenario 1: Detecting sweets of which the production per hour falls below 5000

Let's get started!

  1. Start the WSO2 SP in the editor mode and login to the Stream Processor Studio. Then open a new Siddhi application.
  2. Let's revisit the following configurations that you created in Tutorial 5.

    define stream SweetProductionStream (name string, amount long);
     
    define stream PastHourProductionStream (name string, pastHourTotal long);
     
    from SweetProductionStream#window.time(1 hour)
    select name, sum(amount) as pastHourTotal
    group by name
    insert into LowProductionStream;

     Let's add these configurations to your new Siddhi application in order to edit them in a way that addresses the requirements of this scenario. 

    For this tutorial, the output stream name which was previously PastHourProductionStream is changed to LowProductionStream.

  3. Only events with a per-hour production that is less than 5000 need to be sent to the LowProductionStream output stream. Therefore, add a having clause to the query as follows.

    from SweetProductionStream#window.time(1 hour) select name, sum(amount) as hourlyTotal group by name having hourlyTotal < 5000 insert into LowProductionStream;

    The having hourlyTotal < 5000 clause is added just after the select statement so that all the subsequent operations are applied to the filtered result.

  4. At present, the foreman who is receiving the output of this query cannot identify the hour to which output event applies. Therefore, you need to add an additional attribute named currentHour to the output stream definition as shown below.

    define stream LowProductionStream (name string, hourlyTotal long, currentHour int);

  5. The exact hour to which the reported production per hour applies is not sent in the incoming events generated by the Sweet Bots. This needs to be inferred by the Siddhi application. To infer the exact hour during which the specific sweet production total was calculated, let's add the currentTimeMillis()  time function in Siddhi as follows:

    select name, sum(amount) as hourlyTotal, currentTimeMillis() as currentHour

  6. To specify HOUR  as the unit of time to be extracted for the purpose of reporting the production totals calculated, add the  time:extract()  expression as shown below.

    select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour

  7. The working hours of the sweet factory are between 9.00AM and 5.00PM. However, the Siddhi application runs throughout the day, and (as at now) calculates the sweet production totals even for the hours during which the Sweet Bots are inactive. To avoid reporting during non-working hours where the production was below the target, you need to add a filter that excludes non-working hours as follows:

    from SweetProductionStream#window.time(1 hour) select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour group by name having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17 insert into LowProductionStream;

    Apply the same source and sink configurations that you created in Tutorial 2 to complete the Siddhi application. The completed Siddhi application looks as follows.

    @App:name('SweetProductionKPIApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Low production alert:')
    define stream LowProductionStream (name string, hourlyTotal long, currentHour int);
    
    from SweetProductionStream#window.time(1 hour)
    select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour
    group by name
    having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17
    insert into LowProductionStream;

  8. To check whether this Siddhi application works as expected, send four cURL commands as follows.

    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Toffee",
      "batch": {
        "batch id": "batch1",
        "count": 6000
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gateau",
      "batch": {
        "batch id": "batch1",
        "count": 6200
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gingerbread",
      "batch": {
        "batch id": "batch1",
        "count": 4500
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gateau",
      "batch": {
        "batch id": "batch1",
        "count": 5800
      }
    }'

    The production per hour in the third event is less than 5000. This is logged in the output logs as follows.

Scenario 2 Detecting sweets of which the production per hour rises above 10000

In this scenario, events of which the per-hour production of a sweet exceeds 10000 must be marked with Possible Surplus so that the foreman identifies any surpluses that occur.

Let's get started!

  1. In this scenario, the foreman needs to see the name of the sweet, the total produced during an hour and the hour to which the information applies in order to identify surpluses. Therefore, let's add an output stream definition with attributes to capture this information as follows.

    define stream ProductionStatusStream (name string, hourlyTotal long, currentHour int, status string);
  2. In this scenario, you are receiving information from the same SweetProductionStream input stream used in the previous scenario and in previous tutorials. Let's also add the query to pick values from the SweetProductionStream input stream and add it to the ProductionStatusStream output stream.

    from SweetProductionStream#window.time(1 hour) select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour, "Possible surplus" group by name having hourlyTotal > 10000 and currentHour > 9 and currentHour < 17 insert into ProductionStatusStream;

    Here, you are using the having clause as you did in the previous scenario. In this query, the events filtered are ones that are generated later that the 9.00 AM and earlier than 5.00PM with a production during an hour that exceeds 10000.

  3. The query added in the previous step only sends the filtered events to the ProductionStatusStream output stream. However, the requirement in this scenario is to send all the events to the ProductionStatusStream output stream while events that match the filtering criteria are marked with Possible Surplus. To achieve this, you can use the ifThenElse() function that is configured as follows.

    ifThenElse(<condition>, <valueIfTrue>, <valueIfFalse>)

    Let's add a condition to the query using this function as shown below.

    from SweetProductionStream#window.time(1 hour) select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour, ifThenElse(sum(amount) > 10000, "Possible surplus", "OK") as status group by name having currentHour > 9 and currentHour < 17 insert into ProductionStatusStream;

    Here, if the production within the current hour is more than 10000, Possible Surplus is included in the output event as the value for the status attribute.

    The completed Siddhi application looks as follows.

    @App:name('SweetProductionKPIApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Low production alert:')
    define stream LowProductionStream (name string, hourlyTotal long, currentHour int);
    
    @sink(type='log', prefix='Sweet production status:')
    define stream ProductionStatusStream (name string, hourlyTotal long, currentHour int, status string);
    
    from SweetProductionStream#window.time(1 hour)
    select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour, ifThenElse(sum(amount) > 10000, "Possible surplus", "OK") as status
    group by name
    having currentHour > 9 and currentHour < 17
    insert into ProductionStatusStream;
  4. To check whether this Siddhi application works as expected, you can send the same four cURL commands you issues in the previous scenrio after you save the Siddhi application. The following output is logged.

  • No labels