This documentation is for WSO2 Stream Processor 4.4.0 (the latest version of WSO2 SP. View documentation for the Streaming Integrator, the successor of WSO2 SP.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

In the previous tutorials, you worked with events arriving in real time. Those tutorials only analyzed live events received over a specific period of time.

In this tutorial, let's understand how live data can work in conjunction with historic data as often required in real world scenarios.

Let's consider the scenario for purchasing raw materials for the Sweet Factory. Each time a consignment of raw materials is delivered, the supplier sends an event in JSON format to the WSO2 SP instance of the Sweet Factory. The manager wants the latest event from the supplier recorded for report generation.

This tutorial covers the following topics:

  • Storing real-time data for later processing
  • Correlating real-time data with historic data
  • Manipulating static data based on real-time data

Before you begin:

  • The user scenarios covered in this tutorial are supported by a set of extensions implemented for WSO2 SP called stores.

    A store can be defined as any structure that acts as a data store for both data definition and data manipulation. Supported store implementations in Stream Processor 4.0 include the following:
    • RDBMS store (which in turn supports the following)

      • H2

      • MySQL

      • Oracle Database

      • MS SQL Server

      • PostgreSQL

      • IBM DB2

    • Apache HBase

    • Apache Solr

    • MongoDB

For more information on integrating data stores, please see the page on Storage Integration.

  • In this scenario, the events generated when the supplier delivered raw materials are stored in a MySQL table named SweetFactoryDB. You need to download and install MySQL and create this database before you try the tutorial steps.

     Click here for instructions to configure the database.
    1. Download and install MySQL Server.

    2. Download the MySQL JDBC driver.

    3. Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the <SP_HOME>/lib directory.

    4. Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
      mysql -u username -p 
    5. When prompted, specify the password you are using to access the databases with the username you specified.
    6. Add the following configuration under the Data Sources Configuration section of the <SP_HOME>/conf/editor/deployment.yaml file.

      You need to change the values for the username and password parameters to the username and password that you are using to access the MySQL database.

          - name: SweetFactoryDB
            description: Datasource used for Sweet Factory Supply Records
            jndiConfig:
              name: jdbc/SweetFactoryDB
              useJndiReference: true
            definition:
              type: RDBMS
              configuration:
                jdbcUrl: 'jdbc:mysql://localhost:3306/SweetFactoryDB'
                username: root
                password: root
                driverClassName: com.mysql.jdbc.Driver
                maxPoolSize: 50
                idleTimeout: 60000
                connectionTestQuery: SELECT 1
                validationTimeout: 30000
                isAutoCommit: false 
    7. To create a database table named SweetFactoryDB, issue the following commands from the terminal.
      mysql> create database SweetFactoryDB;
      mysql> use SweetFactoryDB;
      mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
      mysql> grant all on SweetFactoryDB.* TO username@localhost identified by "password";
       

Tutorial steps

Let's get started!

  1. Start the editor, and login to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as ShipmentHistoryApp.
  2. The information captured from the event sent by the supplier must include the name of the raw material, the name of the supplier and the amount of material purchased. Let's add an input stream definition as follows to capture this information.

    define stream RawMaterialStream(name string, supplier string, amount double);
  3. The incoming event from the supplier is in JSON format. Therefore, to convert it to the Siddhi format so that it can be processed by WSO2 Stream Processor, let's add a source configuration that includes a JSON mapping as follows.

    @source(type = 'http', @map(type = 'json'))
    define stream RawMaterialStream(name string, supplier string, amount double);
  4. Let's define an output stream as follows.

    define stream ShipmentDetailsStream(name string, supplier string, amount double);
  5. To forward events from the input steam to the output stream, let's add a simple query as follows.

    from RawMaterialStream
    select name, amount
    insert into ShipmentDetailsStream;

    Now the Siddhi application looks as follows.

    @App:name('ShipmentHistoryApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream RawMaterialStream(name string, supplier string, amount double);
    
    define stream ShipmentDetailsStream(name string, supplier string, amount double);
    
    from RawMaterialStream
    select name, amount
    insert into ShipmentDetailsStream;

    The Stream Processor Studio indicates a syntax error at this stage because the ShipmentDetailsStream has an additional attribute other than the attributes included in the select clause. Ignore this error because the Siddhi application is still in an incomplete state. This error is corrected in the next steps.

  6.  The ShipmentHistoryApp in its current state can only forward all the events in the RawMaterialStream to the ShipmentDetailsStream. However, you need to save the incoming events in a store instead of passing them to a stream. Therefore, let's convert the output stream definition to a store definition by changing the define stream syntax to define table.

    define table ShipmentDetails(name string, supplier string, amount double);

    When the events are directed to a data store, they are not persisted, but they are stored within an in-memory table for later retrieval. The information stored in-memory is no longer available once the server is restarted.

  7. Compared to a stream, a table supports a set of additional annotations that enables it to leverage on additional functionalities offered by data stores, such as unique keys and indexes.

    A unique key (also referred to as primary key) is useful for uniquely identifying records. Fields set as unique keys are allowed to have duplicate values within the data store. Let's set a unique key using the @PrimaryKey annotation, as shown below.

    @primaryKey('name')
    define table ShipmentDetails(name string, supplier string, amount double);

  8. The purchase records stored in the table need to be indexed by the supplier. To do this, you need to specify that the supplier attribute is an index attribute as shown below.

    The @index annotation is used for specifying secondary indexes. This is useful for data retrieval scenarios. If the underlying data storage mechanism (e.g., RDBMS) supports secondary indexes, the fields specified here are indexed in the data store-level itself.

    @primaryKey('name')
    @index('supplier')
    define table ShipmentDetails(name string, supplier string, amount double);

  9.  Let's specify a store of the RDBMS type to the ShipmentDetails table to bind RDBMS data storage mechanisms to it.

    To do this, you need to use the @store annotation. Similar to the @sink and the @source annotations that you have already used in these tutorials, the properties used within the annotation vary depending on the type of the store.

    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms')
    define table ShipmentDetails(name string, supplier string, amount double);

  10. The RDBMS store has a set of properties that are required to be set in order to create the connection with the underlying DB instance. These include the JDBC URL of the database, the username, password and such. Let's add values to these properties so that the store definition becomes syntactically complete.

    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms', jdbc.url="jdbc: mysql://localhost:3306/SweetFactoryDB ", username="root", password="root" , jdbc.driver.name ="com.mysql.jdbc.Driver")
    define table ShipmentDetails(name string, supplier string, amount double);

    Now the updated Siddhi application is as follows:

    @App:name('ShipmentHistoryApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream RawMaterialStream(name string, supplier string, amount double);
    
    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
    define table ShipmentDetails(name string, supplier string, amount double);
    
    from RawMaterialStream
    select name, supplier, amount
    insert into ShipmentDetails;
    
    
  11. The factory manager needs to see the latest shipments for each raw material. If a record for a particular raw material does not already exist in the database, it must be added as a new entry. If it already exists, its previous must be overwritten by the new incoming record.

    This can be considered as an update or insert scenario. In Siddhi stores, the update or insert into directive can be used for this purpose in place of the normal insert into command. In order to carry out this operation, let's update the query as follows:

    from RawMaterialStream select name, supplier, amount update or insert into ShipmentDetails;

  12. In the above query, the criteria based on which Siddhi can identify whether a record already exists in the database is not specified. In this scenario, we can specify that a record needs to be updated when the name attribute (which is the primary key of the table) matches any pre-existing record. This can be done by adding a condition to the query as shown below.

    from RawMaterialStream
    select name, supplier, amount
    insert into ShipmentDetails
    on ShipmentDetailTables.name == name;


    The completed query is as follows.

    @App:name('ShipmentHistoryApp')
     
    @source(type = 'http', @map(type = 'json'))
    define stream RawMaterialStream(name string, supplier string, amount double);
     
    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
    define table ShipmentDetails(name string, supplier string, amount double);
     
    from RawMaterialStream
    select name, supplier, amount
    update or insert into ShipmentDetails
    on ShipmentDetails.name == name;
  13. To store some data in the ShipmentDetails table, issue the following cURL commands:

    curl -X POST \
      http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Flour",
    	"supplier": "Acme",
        "amount": 460.0
      }
    }'
    curl -X POST \
      http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Sugar",
    	"supplier": "Indigo6",
        "amount": 272.0
      }
    }'
    curl -X POST \
      http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Honey",
    	"supplier": "The BeeGees",
        "amount": 9.0
      }
    }'
    curl -X POST \
      http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Food Coloring",
    	"supplier": "Wadjet Food Products",
        "amount": 30.0
      }
    }'
    curl -X POST \
      http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Chocolate Chip",
    	"supplier": "Larkspur Landing",
        "amount": 34.0
      }
    }'
  14. Once the events are sent, the event table is updated to be similar to the example shown below.

    NameSupplierAmount
    FlourAcme460
    SugarIndigo6272
    Honey

    The BeeGees

    9
    Food ColoringWadjet Food Products30
    Chocolate ChipLarkspur Landing34


    The details in the store can be retrieved through the Store API as well as by deploying the Siddhi application in  a worker node. To get the above details, issue the following cURL command.

    In order to issue the following command, the server must be started in the worker node.

    curl -X POST \
     http://localhost:9090/stores/query \
     -u admin:admin \
     -H 'content-type: application/json' \
     -d '{
    "appName" : "ShipmentHistoryApp",
    "query" : "from ShipmentDetails select *"
    }'

    The body of this request contains two parameters, which are appName and  query

    • appName: This must be same as the name of the Siddhi application deployed In this scenario, it is ShipmentHistoryApp.
    • query: Here, a valid select Siddhi query that includes the table name must be specified. In this scenario, the following query which includes the ShipmentDetails table created in this scenario is specified.

      from ShipmentDetails select *

      This retrieves all the data in the ShipmentDetails store. Further, Siddhi filters such as having can be applied in order to filter data.

  • No labels