In this tutorial, let’s understand how to capture changes from datastores as events and process them.
Let’s consider that there are two branches of the Sweet Factory named A and B. Sweet Bots insert a record into the database in the relevant branch after each batch of sweets is produced. The manager in the head office wants a name-wise total for all the categories of sweet produced in both the branches until now.
To understand the requirement, assume that the following quantities were reported from the two brances after the completion of a production run.
When the total production per batch for each sweet category is calculated, the manager in the head office expects to receive the following result.
Before you begin:
- Try out the Integrating Datastores tutorial.
Set up the databases and the tables required for this tutorial as follows:
It is assumed that you have already installed MySQL for the Integrating Datastores tutorial. If youy have skipped it, install MySQL as follows:Click here for MySQL Installation Instructions
Download and install MySQL Server.
Download the MySQL JDBC driver.
Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (
mysql-connector-java-x.x.xx-bin.jar) into the
- Enter the following command in a terminal/command window, where
usernameis the username you want to use to access the databases.
mysql -u username -p
- When prompted, specify the password you are using to access the databases with the username you specified.
Create two databases named
productionBby issuing the following commands.
In each database, create a table named
SweetProductionby issuing the following commands.
Enable binary logging for MySQL.
- Download and install the latest version of CDC Siddhi extension from the Siddhi Extensions page. For detailed instructions, see Downloading and Installing Siddhi Extensions.
To understand the CDC source type, see Siddhi-io-cdc.
Let's get started.
Start the editor profile and sign in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as
Each captured event needs to include an item and a quantity. This information is reported for both the branches. Therefore, let's define an input stream for each branch as follows.
To receive the change events from the database table, you need an event source of the
cdctype. Let’s add it above each stream definition as shown below.
Here, branch A receives change events from the
production Adatabase, and branch B receives them from the
The events from both input streams need to be available in one stream before the output can be processed. Therefore, let's define another stream for this purpose as follows.
The expected output is the total production per sweet category. Therefore, let's define an output stream with the same attributes as the input streams.
The output needs to be presented as logs in the console. Therefore, let's add a sink of the
Let’s add a query as follows to take events from input stream A and send it to the
To take events from input stream B and send it to the
SweetProductionStreamstream, let's add another query as follows.
Finally, let's add a query to process the events in the
SweetProductionStreamstream and then insert the the item and total quantity into the output stream.
sum()Siddhi function is applied to the quantity attribute of the
SweetProductionStreamstream to derive the total production reported by all the events from both the branches.
The Siddhi application is now ready to be saved. It looks as follows.
Generating the output
To generate an output from the
TotalSweetProductionApp Siddhi application, follow the procedure below:
Insert two records from branch A as follows:
The following logs are displayed in the output console.
Insert another two records from branch B as follows:
The following is logged in the output console.