In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.
Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person reports transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.
Before you begin:
In this scenario, you need to enrich the information sent by sales people by updating events generated by them based on the records in a MySQL table named
TransactionDataDB. You need to download and install MySQL, and create this table before you carry out the tutorial steps.
Download and install MySQL Server.
Download the MySQL JDBC driver.
Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (
mysql-connector-java-x.x.xx-bin.jar) into the
- Enter the following command in a terminal/command window, where
usernameis the username you want to use to access the databases.
mysql -u username -p
- When prompted, specify the password you are using to access the databases with the username you specified.
Add the following configurations for three database tables under the Data Sources Configuration section of the
You need to change the values for the
passwordparameters to the username and password that you are using to access the MySQL database.
- To create three database tables named
TriggerStateDB, issue the following commands from the terminal.
- To create the
mysql> create database TransactionDataDB; mysql> use TransactionDataDB; mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql; mysql> grant all on TransactionDataDB.* TO username@localhost identified by "password";
- To create the
mysql> create database UserDataDB; mysql> use UserDataDB; mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql; mysql> grant all on UserDataDB.* TO username@localhost identified by "password";
- To create the
mysql> create database TriggerStateDB; mysql> use TrggerStateDB; mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql; mysql> grant all on TriggerStateDB.* TO username@localhost identified by "password";
- To create the
This section covers two inter-relatedscenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.
Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.
Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store.
Let's get started!
- First, let's enter the required Siddhi queries to extract and clean data.
The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.
- Before enriching the data for further analysis, it needs to be cleaned. This involves checking for nullvalues,and replacing them with default values. To do this, let's follow the steps below:
The information specified in the select clause given above is taken from the
TransactionsStreaminput stream that you previously defined. To specify this, let's add a
fromstatement as follows.
Let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travellingsales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the
nullvalue for the
locationattribute of such events with
unknown. This can be achieved via the
ifTenElse()function of Siddhi by including it in the
selectclause as shown below.
Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.
Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as
CleaningData. The completed query looks as follows.
The Siddhi application currently looks as follows:
- The information you have extracted and cleaned needs to be enriched. To do this, let's add the required queries as follows.
To enrich the data from the
CleanedTransactionsStreamstream, events need to be joined with records that are already saved in a database table. To do this, the table needs to be defined within the Siddhi application. Therefore, let's add the table definition as follows with a reference to the database table.
To use the information in this table, make a reference to the related data source via the
@storeannotation as follows.
To make it easy for other users to understand the purpose of this table, you can also add a comment for the table as shown below.
You have already directed the extracted and cleaned information to the
CleanedTransactionsStreamstream. Therefore, let's specify that stream as the input stream for this query.
The user IDs of the registered sales people are already saved in a table known as the
userTable. To enrich the data, user IDs of the events in the
CleanedTransactionsStreamstream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.
Now, let's include a concatenation to derive a username from the first names and the last names of the salespeople as follows.
The enriched data can be directed to another inferred output stream named
Let's complete the query you created for enriching data by naming it as
Now the partially completed Siddhi application looks as follows:
Now you have done the required configurations to extract data from JMS and enrich it. To save the enriched data in another database table, lets follow the steps given below.
Each enriched record stored should include the user ID, transaction amount and the location. Let's add the definition of the table to store the enriched data as shown below.
To insert the enriched data to the table you created, let's add another Siddhi query as follows. This query takes the enriched events from the
EnrichedTransactionsStreaminferred stream (which receives the enriched data), and inserts them into the
TransactionTabletable you defined.
The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with
JMSas the transport and
JSONas the format. You can add this above the stream definition as shown below.
Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:
Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka
Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial the WSO2 Stream Processor to achieve this as shown below.
Let's get started!
- First, you need to ensure that the information extracted for processing is based on the latest transactions. To do that, let's collect information relating to the latest transactions as follows.
You need a database table that at any given time, contains information about which transactions were last processed. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.
Here, you are creating the new
TriggerStateTabletable in the same
TransactionDataDBwhich you created before starting this tutorial and stored data in the previous scenario. The
keyattribute in the table schema is the primary key, and values for this attribute are the row IDs in the table.
TriggerStateTabletable you created needs to be polled every five minutes for the last processed ID. To do this, you can compare the last processed ID in the table with the last processed ID in a stream in which a new event is triggered every five minutes. To make this possible, let's define a trigger that triggers an event in an inferred stream every five minutes.
- To derive the latest transaction by comparing the last processed ID in the
TriggerStateTabletable and the
TriggerStreaminferred stream, let's create a Siddhi query as follows:
TriggerStateTablehas information about the last processed ID (i.e., the value of the
lastProcessedIDattribute). To keep polling this table lt's join it with the
TriggerStreamstream in which events are entered every 5 minutes. The join can be performed as follows.
The events extracted based on the last processed ID need to be inserted into another stream so that it can be used for further processing. Therefore, let's add the
insert intoclause with an inferred stream as follows.
Now you need to specify the condition based on which the last processed ID is selected from the joined
TriggerStreamstream and the
As mentioned before, the purpose of the
TriggerStreamstream is to poll the
TransactionTabletable. Each new event in this stream is a request to poll the table, and once it is polled, the last processed ID derived is selected for further processing. If no value exists in the table for the
lastProcessedIDattribute (e.g., if the table is new),
0must be added as the last processed ID by default. To do this, let's add a select clause with an
IfThenElsecondition as shown below.
The partially completed Siddhi application looks as follows.
Note that the query is named
CollectLastProcessedID. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.
- To extract the data that needs to be processed, let's further update the Siddhi application as follows.
In the previous scenario, you stored the data to be processed in a table named
TransactionTable. To access that data for this scenario, you need to include the same table definition in the Siddhi application that you are creating for this scenario as follows.
- The data to be extracted for processing needs to be older than the last processed ID. To extract information based on this condition, let's add a Siddhi query named
The last processed transaction ID should be taken from the
TransactionTableonly if the last processed ID in that table is later than the last processed ID of an event in the
DataRetrievalStreamstream. Therefore, let's join this stream and table to do this comparison and extract the required events.
The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named
ExtractedDataStreamthat can be inferred.
Now let's select the information to be inserted into the
ExtractedDataStreamstream for the extracted events.
The partially completed Siddhi application now looks as follows.
When the above query is executed, you have the latest transactions extracted and directed to the
ExtractedDataStreamstream. To indicate that these transactions are already considered for processing and therefore, should not be selected for processing again, you need to update the
TriggerStateTabletable. To do this, enter a query named
The information with which the
TriggerStateTabletable is updated is taken from the
ExtractedDataStreamstream in which the latest transactions aree extracted every 5 minutes. To indicate this, add a
fromclause to the
UpdateLastProcessedIDquery as follows.
When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the
lastProcessedIDattribute specified as the key.
The event with the last processed ID may already exist in the
TriggerStateTabletable. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add an
update or insertclause as follows.
The partially created Siddhi application now looks as follows:
The enriched data now needs to be transformed via a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query named
StatefulAggregationto carry out this analysis.
The information to be analyzed is taken from the
ExtractedDataStreamstream, and the results are inserted into an output stream named
TransformedDataStream. Therefore, let's add the from and insert into clauses accordingly.
As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.
The attributes in the
transactionAmount. The values for the
transactionLocationattribute need to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for the
transactionAmountattribute. This can be achieved via the
avg()Siddhi functions. Therefore, let's add the select clause as follows.
The analysis is carried out to evaluate the sales by location. Therefore, let's add a
group byclause to group the calculations by the location when presenting the results.
The partially created Siddhi application now looks as follows.
The information processed via the
StatefulAggregationquery and directed to the
TransformedDataStreamstream needs to be published via Kafka in the Binary format. To specify this, lets follow the steps below.
The events published are taken from the
TransformedDataStreamstream. Therefore, the sink configuration needs to be connected to this stream. To do this, first, let's add a definition for the
TransformedDataStreamstream which is currently an inferred stream.
Now let's connect a sink definition where the transport type is Kafka and the map type is binary as shown below.
Before saving the Siddhi application, add a name and a description to the Siddhi application as follows.
Save the Siddhi application.. The completed version looks as follows.