In the previous tutorials, you worked with events arriving in real time. Those tutorials only analyzed live events received over a specific period of time.
In this tutorial, let's understand how live data can work in conjunction with historic data as often required in real world scenarios.
Let's consider the scenario for purchasing raw materials for the Sweet Factory. Each time a consignment of raw materials is delivered, the supplier sends an event in JSON format to the WSO2 SP instance of the Sweet Factory. The manager wants the latest event from the supplier recorded for report generation.
Let's get started!
- Start the editor, and login to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as
The information captured from the event sent by the supplier must include the name of the raw material, the name of the supplier and the amount of material purchased. Let's add an input stream definition as follows to capture this information.
The incoming event from the supplier is in JSON format. Therefore, to convert it to the Siddhi format so that it can be processed by WSO2 Stream Processor, let's add a source configuration that includes a JSON mapping as follows.
Let's define an output stream as follows.
To forward events from the input steam to the output stream, let's add a simple query as follows.
Now the Siddhi application looks as follows.
The Stream Processor Studio indicates a syntax error at this stage because the
ShipmentDetailsStreamhas an additional attribute other than the attributes included in the select clause. Ignore this error because the Siddhi application is still in an incomplete state. This error is corrected in the next steps.
ShipmentHistoryAppin its current state can only forward all the events in the
ShipmentDetailsStream. However, you need to save the incoming events in a store instead of passing them to a stream. Therefore, let's convert the output stream definition to a store definition by changing the
define streamsyntax to
When the events are directed to a data store, they are not persisted, but they are stored within an in-memory table for later retrieval. The information stored in-memory is no longer available once the server is restarted.
Compared to a stream, a table supports a set of additional annotations that enables it to leverage on additional functionalities offered by data stores, such as unique keys and indexes.
A unique key (also referred to as primary key) is useful for uniquely identifying records. Fields set as unique keys are allowed to have duplicate values within the data store. Let's set a unique key using the
@PrimaryKeyannotation, as shown below.
The purchase records stored in the table need to be indexed by the supplier. To do this, you need to specify that the
supplierattribute is an index attribute as shown below.
@indexannotation is used for specifying secondary indexes. This is useful for data retrieval scenarios. If the underlying data storage mechanism (e.g., RDBMS) supports secondary indexes, the fields specified here are indexed in the data store-level itself.
Let's specify a store of the RDBMS type to the
ShipmentDetailstable to bind RDBMS data storage mechanisms to it.
To do this, you need to use the
@storeannotation. Similar to the
@sourceannotations that you have already used in these tutorials, the properties used within the annotation vary depending on the type of the store.
The RDBMS store has a set of properties that are required to be set in order to create the connection with the underlying DB instance. These include the JDBC URL of the database, the username, password and such. Let's add values to these properties so that the store definition becomes syntactically complete.
Now the updated Siddhi application is as follows:
The factory manager needs to see the latest shipments for each raw material. If a record for a particular raw material does not already exist in the database, it must be added as a new entry. If it already exists, its previous must be overwritten by the new incoming record.
This can be considered as an update or insert scenario. In Siddhi stores, the update or insert into directive can be used for this purpose in place of the normal insert into command. In order to carry out this operation, let's update the query as follows:
In the above query, the criteria based on which Siddhi can identify whether a record already exists in the database is not specified. In this scenario, we can specify that a record needs to be updated when the
nameattribute (which is the primary key of the table) matches any pre-existing record. This can be done by adding a condition to the query as shown below.
The completed query is as follows.
To store some data in the
ShipmentDetailstable, issue the following cURL commands:
Once the events are sent, the event table is updated to be similar to the example shown below.
Name Supplier Amount Flour Acme 460 Sugar Indigo6 272 Honey
9 Food Coloring Wadjet Food Products 30 Chocolate Chip Larkspur Landing 34
The details in the store can be retrieved through the Store API as well as by deploying the Siddhi application in a worker node. To get the above details, issue the following cURL command.
In order to issue the following command, the server must be started in the worker node.
The body of this request contains two parameters, which are
appName: This must be same as the name of the Siddhi application deployed In this scenario, it is
query: Here, a valid
selectSiddhi query that includes the table name must be specified. In this scenario, the following query which includes the
ShipmentDetailstable created in this scenario is specified.
from ShipmentDetails select *
This retrieves all the data in the
ShipmentDetailsstore. Further, Siddhi filters such as
havingcan be applied in order to filter data.