In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.
Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.
This section covers two scenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.
Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.
Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store.
Let's get started!
The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.
- Before enriching the data for further analysis, it needs to be cleaned. This involves checking for null values, and replacing them with default values. To do this, let's follow the steps below:
First, let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travelling sales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the
nullvalue for the
locationattribute of such events with
unknown. This can be achieved via the
ifTenElse()function of Siddhi by including it in the
selectclause as shown below.
The information specified in the select clause given above is taken from the
TransactionsStreaminput stream that you previously defined. To specify this, let's add a
fromstatement as follows.
Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.
Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as
CleaningData. The completed query looks as follows.
- The information you have extracted and cleaned needs to be enriched. To do this, let's create another query named
You have already directed the extracted and cleaned information to the
CleanedTransactionsStreamstream. Therefore, let's specify that stream as the input stream for this query.
The user IDs of the registered sales people are already saved in a table known as the
userTable. To enrich the data, user IDs of the events in the
CleanedTransactionsStreamstream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.
Now, let's include a concatenation to derive a user name ID from the first name and the last name of the sales people as follows.
The enriched data can be directed to another inferred output stream named
Let's complete the query you created for enriching data by naming it as EnrichData.
EnrichDataSiddhi query you created, you included a join between the
CleanedTrasanctionStreamstream and a table named
UserTable. For this query to be executed, the definitions for both the stream and the table need to be included in the same Siddhi application. You have already added the stream definition. Now let's add the table definition above the Siddhi queries as follows.
More table definitions are added as you proceed with the tutorial. Therefore, it is useful to specify the purpose of each table definition via a comment as shown below.
Let's create another table to store the enriched data as shown below.
To insert the enriched data to the table you created, let's add another Siddhi Query as follows:
The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with
JMSas the transport and
JSONas the format. You can add this above the stream definition as shown below.
Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:
Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka
Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial the WSO2 Stream Processor to achieve this as shown below.
Let's get started!
The information you need to transform and load to Kafka needs to be extracted from a datastore. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.
Once the information is extracted and transformed, the transformed data needs to be directed to an output stream. This output stream can be defined as follows.
The database table needs to be polled every five minutes for the last processed ID. To this, you can compare the last processed ID with a stream in which a new event is triggered every five minutes. Therefore, let's add a trigger as follows:
- To collect the last processed transaction ID, you need to add a Siddhi query as follows.
To extract the last processed ID, the events triggered in the
TriggerStreamstream as well as the events stored in the
TriggerStateTabletable need to be considered. Therefore, let's add a Siddhi join to join this stream and the table as follows.
The data collected needs to be inserted into another stream so that it can be used for further processing. Therefore, let's add the
insert intoclause with an inferred stream as follows.
Now you need to specify the condition based on which the last processed ID is selected from the joined
TriggerStreamstream and the
TransactionTabletable, and inserted into the
DataRetrievalStream. Let's add it as follows.
Let's define a data store to save the transction data as follows.
- Now you have created a query to collect the last processed IDs from a triggered stream and a table. The data is taken from two locations via a join. Therefore, you need to process the data collected from both to extract the last processed ID. To do this, let's create a Siddhi query as follows.
The last processed transaction ID should be taken from the TransactionTableonly if the last processed ID in that table is later than the last processed ID of an event in the
DataRetrievalStreamstream. Therefore, let's join this stream and table so that the last processed ID is selected by comparing events in both.
The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named
ExtractedDataStreamthat can be inferred.
Now let's select the information to be inserted into the
ExtractedDataStreamstream from the event that is extracted as the last processed event.
The completed query looks as follows.
Note that the query is named
ExtractData. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.
- You need to update the
TriggerStateTabletable with the last processed ID identified after the join between the table and the stream. To do this, let's create a Siddhi query as follows.
The information with which the
TriggerStateTabletable is updated is taken from the
ExtractedDataStreamstream to which the last processed transaction is being sent every five minutes. Therefore, let's add this stream as the input stream.
The event with the last processed ID may already exist in the
TriggerStateTabletable. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add an
update or insertclause as follows.
When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the
lastProcessedIDattribute specified as the key.
The completed query looks as follows.
- Now let's perform a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query to carry out this analysis.
The information to be analyzed is taken from the
ExtractedDataStreamstream, and the results are inserted into an output stream named
TransformedDataStream. Therefore, let's add the
insert intoclauses acoordingly.
As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.
The attributes in the
transactionAmount. The values for the
transactionLocationattribute needs to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for the
transactionAmountattribute. This can be achieved via the
avg()Siddhi functions. Therefore, let's add the select clause as follows.
The analysis is carried out to evaluate the sales by location. Therefore, let's add a group by clause to group the calculations by the location when presenting the results.
Now let's name the query and complete it as follows.
The processed information needs to be published via Kafka. Therefore, let's add a Kafka sink as shown below.
This sink publishes data from the
TransformedDataStreamstream that is added as the output stream to direct the data processed by the
StatefulAggregationSiddhi query. Therefore, let's connect the sink to that stream by adding the stream definition below it as shown below.
Now let's name the Siddhi application and complete it. The completed version looks as follows.