This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

In the previous tutorials, you covered basic Siddhi functionality including ingesting data, preprocessing, data store integration and publishing events. In all of these tutorials, you have considered events arriving from a single source, (either data sent by the SweetBots on the factory floor or data from the suppliers of raw material).

In a real world user scenario, data from various sources need to be analyzed in tandem. This can be anything from multiple disparate entities as well as pre-received data stored elsewhere. Hence, correlating events from multiple sources needs to be studied. Siddhi offers this functionality through stream and store joins. A join can be used to match events arriving from different sources based on given criteria and then process them together.

To understand this concept, let's consider a scenario in the sweet factory where events are sent to denote the following:

  • The raw materials arriving at the factory (name of the material and the amount)

  • The raw materials consumed per production run by all SweetBots (name and amount)

The factory needs to ensure that the production of flour and sugar does not consume more than 95% of the supply in the past hour for sustainability reasons. Any material that exceeds this limit should be recorded.

Before you begin:

Tutorial steps

Let's get started!

  1. To create your Siddhi application, let's start the editor, and log in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application.
  2. Now let's define two input streams as follows.

    define stream MaterialConsumptionStream(name string, user string, amount double);


    define stream MaterialSupplyStream(name string, supplier string, amount double);
  3. The results output must include the name of the raw material, amounts of the supplied and produced material, and the name of the raw material user and supplier if the threshold is reached. Let's define an output stream based on these details:

    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
  4. To select events from the MaterialConsumptionStream for analysis, write a query as follows.

    from MaterialConsumptionStream
    select name, amount, user
  5. Let's define an alias for the MaterialConsumptionStream so that it can be identified more easily once you have used the join statement.

    from MaterialConsumptionStream as c select c.name, c.amount, c.user

    The same needs to be done for the MaterialSupplyStream.

    from MaterialSupplyStream as s select s.name, s.amount, s.supplier

  6. Now that we have both input streams ready, let's join both inputs together using the join keyword.

    from MaterialConsumptionStream as c

    join MaterialSupplyStream as s

  7. To correlate the two streams, you need a common attribute in both of them. In this scenario, both streams have the name of the material produced. Therefore, name can be used as the attribute as shown below.

    from MaterialConsumptionStream as c
    join MaterialSupplyStream as s
    on c.name == s.name

  8. The time period to be considered is one hour, a time window of one hour should be added to both streams as shown below.

    from MaterialConsumptionStream#window.time(1 hour) as c
    join MaterialSupplyStream#window.time(1 hour) as s
    on  c.name  ==  s.name

  9. To ensure that the consumption does not exceed 95% of the supply, you can add a query as follows.

    s.amount * 0.95 < c.amount

    The complete statement in a usable format looks as follows.

    from MaterialConsumptionStream#window.time(1 hour) as c
    	join MaterialSupplyStream#window.time(1 hour) as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream

    The completed Siddhi application with source and sink configurations added looks as follows.

    @App:name('MaterialThresholdAlertApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialConsumptionStream(name string, user string, amount double);
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialSupplyStream(name string, supplier string, amount double);
    
    @sink(type='log', prefix='Materials that go beyond sustainability threshold:')
    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
    
    from MaterialConsumptionStream#window.time(1 hour) as c
    	join MaterialSupplyStream#window.time(1 hour) as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream;


    Now, if you want to correlate the events not just from streams but also with previously stored data, the store that was used for storing the events can be used in place of either stream (but not both).

    define table ShipmentDetailsTable(name string, supplier string, amount double);
    define stream MaterialConsumptionStream(name string, user string, amount double);

    from MaterialConsumptionStream#window.time(1 hour) as c
    join ShipmentDetailsTable as s
    on c.name == s.name
    select ...

    The following is the completed Siddhi application. It contains the same query as above that uses historical data from the ShipmentDetailsTable.

    @App:name('PersistentMaterialThresholdAlertApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialConsumptionStream(name string, user string, amount double);
    
    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
    define table ShipmentDetailsTable(name string, supplier string, amount double);
    
    @sink(type='log', prefix='Materials that go beyond sustainability threshold:')
    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
    
    from MaterialConsumptionStream#window.time(1 hour) as c
    	join ShipmentDetailsTable as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream;
  • No labels