This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.

Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.

Tutorial steps

This section covers two scenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.

Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.

Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store.

Let's get started!

  1. The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.

    define stream TrasanctionStream (userId long, transactionAmount double, location string);
  2. Before enriching the data for further analysis, it needs to be cleaned. This involves checking for null values, and replacing them with default values. To do this, let's follow the steps below:
    1. First, let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travelling sales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the null value for the location attribute of such events with unknown. This can be achieved via the ifTenElse() function of Siddhi by including it in the select clause as shown below.

      select userId,
      transactionAmount,
      ifThenElse(location is null, "UNKNOWN", location) as location

    2. The information specified in the select clause given above is taken from the TransactionsStream input stream that you previously defined. To specify this, let's add a from statement as follows.

      from TrasanctionStream
      select userId,
      transactionAmount,
      ifThenElse(location is null, "UNKNOWN", location) as location

    3. Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.

      from TrasanctionStream
      select userId,
      transactionAmount,
      ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

    4. Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as CleaningData. The completed query looks as follows.

      @info(name = 'CleaningData')
      from TrasanctionStream
      select userId,
      transactionAmount,
      ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

  3. The information you have extracted and cleaned needs to be enriched. To do this, let's create another query named EnrichData as follows.
    1. You have already directed the extracted and cleaned information to the CleanedTransactionsStream stream. Therefore, let's specify that stream as the input stream for this query.

      from CleanedTrasanctionStream

    2. The user IDs of the registered sales people are already saved in a table known as the userTable. To enrich the data, user IDs of the events in the CleanedTransactionsStream stream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.

      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId

    3. Now, let's include a concatenation to derive a user name ID from the first name and the last name of the sales people as follows.

      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location

    4. The enriched data can be directed to another inferred output stream named EnrichedTrasanctionStream.

      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

    5. Let's complete the query you created for enriching data by naming it as EnrichData.

      @info(name = 'EnrichData')
      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

  4. In the EnrichData Siddhi query you created, you included a join between the CleanedTrasanctionStream stream and a table named UserTable. For this query to be executed, the definitions for both the stream and the table need to be included in the same Siddhi application. You have already added the stream definition. Now let's add the table definition above the Siddhi queries as follows.

    More table definitions are added as you proceed with the tutorial. Therefore, it is useful to specify the purpose of each table definition via a comment as shown below.

    define stream TrasanctionStream (userId long, 
    transactionAmount double, location string);

    -- Table used to enrich data
    @store(type = 'rdbms', datasource = 'TRANSACTION_DATA_SOURCE')
    define table UserTable (userId long, firstName string,
    lastName string);

              
    @info(name = 'CleaningData')
    from TrasanctionStream
    select userId,
    transactionAmount,
    ifThenElse(location is null, "UNKNOWN", location) as location
    insert into CleanedTrasanctionStream;
              
    @info(name = 'EnrichData')
    from CleanedTrasanctionStream as c join UserTable as u
    on c.userId == u.userId
    select c.userId,
    str:concat( u.firstName, " ", u.lastName) as userName,
    transactionAmount,
    location
    insert into EnrichedTrasanctionStream;
  5. Let's create another table to store the enriched data as shown below.

    -- Final table to load the data
              
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
    define stream TrasanctionTable (userId long,
    transactionAmount double, location string);
              
  6. To insert the enriched data to the table you created, let's add another Siddhi Query as follows:

    @info(name = 'LoadData')
    from EnrichedTrasanctionStream
    insert into TransactionTable;
  7. The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with JMS as the transport and JSON as the format. You can add this above the stream definition as shown below.

    @sink(type = 'jms' , destination = 'transactions',
    factory.initial = '...', provider.url = '...',
    @map(type = 'json',
    @attributes('$.userId', '$.transactionAmount', '$.location')))

    define stream TrasanctionStream (userId long, transactionAmount double, location string);

Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:

@App:name("JMS-to-DB-ETL")
@App:description("Extract data from JMS, perform stateless transformation, and load to database")
 
-- Sample input message: {“userId”:2345, 
-- “transactionAmount”:456.0, 
-- “location”: "CA, USA”}

@sink(type = 'jms' , destination = 'transactions', 
   factory.initial = '...', provider.url = '...',
   @map(type = 'json',
      @attributes('$.userId', '$.transactionAmount', '$.location')))
 
define stream TrasanctionStream (userId long, 
                        transactionAmount double, location string);
 
-- Table used to enrich data
@store(type = 'rdbms', datasource = 'TRANSACTION_DATA_SOURCE')
define table UserTable (userId long, firstName string, 
                        lastName string);
 
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define stream TrasanctionTable (userId long, 
                        transactionAmount double, location string);
 
@info(name = 'CleaningData')
from TrasanctionStream
select userId, 
       transactionAmount, 
       ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
 
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
   on c.userId == u.userId
select c.userId, 
       str:concat( u.firstName, " ", u.lastName) as userName, 
       transactionAmount, 
       location
insert into EnrichedTrasanctionStream;
 
@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;


Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka

Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial the WSO2 Stream Processor to achieve this as shown below.

Let's get started!

  1. The information you need to transform and load to Kafka needs to be extracted from a datastore. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.

    -- In-memory Table to keep last processed transaction ID
    @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
  2. Once the information is extracted and transformed, the transformed data needs to be directed to an output stream. This output stream can be defined as follows.

    define stream TranformedDataStream(transactionLocation string, 
    totalTransactionAmount double, avgTransactionAmount double,
    transactionId long);
  3. The database table needs to be polled every five minutes for the last processed ID. To this, you can compare the last processed ID with a stream in which a new event is triggered every five minutes. Therefore, let's add a trigger as follows:

    define trigger TriggerStream at every 5 min;

    define stream TranformedDataStream(transactionLocation string,
    totalTransactionAmount double, avgTransactionAmount double,
    transactionId long);
     

    -- In-memory Table to keep last processed transaction ID
    @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
  4. To collect the last processed transaction ID, you need to add a Siddhi query as follows.
    1. To extract the last processed ID, the events triggered in the TriggerStream stream as well as the events stored in the TriggerStateTable table need to be considered. Therefore, let's add a Siddhi join to join this stream and the table as follows.

      from TriggerStream as s right outer join TriggerStateTable as t
    2. The data collected needs to be inserted into another stream so that it can be used for further processing. Therefore, let's add the insert into clause with an inferred stream as follows.

      from TriggerStream as s right outer join TriggerStateTable as t
      insert into DataRetrievalStream;

    3. Now you need to specify the condition based on which the last processed ID is selected from the joined TriggerStream stream and the TransactionTable table, and inserted into the DataRetrievalStream. Let's add it as follows.

      from TriggerStream as s right outer join TriggerStateTable as t
      select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
      as lastProcessedId
      insert into DataRetrievalStream;

  5. Let's define a data store to save the transction data as follows.

    -- Store table
              
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
    define table TransactionTable(transactionId long, userId long,
    transactionAmount double, transactionLocation string);
  6. Now you have created a query to collect the last processed IDs from a triggered stream and a table. The data is taken from two locations via a join. Therefore, you need to process the data collected from both to extract the last processed ID. To do this, let's create a Siddhi query as follows.
    1. The last processed transaction ID should be taken from the TransactionTableonly if the last processed ID in that table is later than the last processed ID of an event in the DataRetrievalStream stream. Therefore, let's join this stream and table so that the last processed ID is selected by comparing events in both.

      from DataRetrievalStream as s join TransactionTable as t
      on s.lastProcessedId < t.transactionId
    2. The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named ExtractedDataStream that can be inferred.

      from DataRetrievalStream as s join TransactionTable as t
      on s.lastProcessedId < t.transactionId

      insert into ExtractedDataStream;

    3. Now let's select the information to be inserted into the ExtractedDataStream stream from the event that is extracted as the last processed event.

      from DataRetrievalStream as s join TransactionTable as t
      on s.lastProcessedId < t.transactionId
      select t.transactionId, t.transactionAmount, t.transactionLocation
      insert into ExtractedDataStream;

    4. The completed query looks as follows.

      Note that the query is named ExtractData. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.

      @info(name = 'ExtractData')
      from DataRetrievalStream as s join TransactionTable as t
      on s.lastProcessedId < t.transactionId
      select t.transactionId, t.transactionAmount, t.transactionLocation
      insert into ExtractedDataStream;


  7. You need to update the  TriggerStateTable table with the last processed ID identified after the join between the table and the stream. To do this, let's create a Siddhi query as follows.
    1. The information with which the TriggerStateTable table is updated is taken from the ExtractedDataStream stream to which the last processed transaction is being sent every five minutes. Therefore, let's add this stream as the input stream.

      from ExtractedDataStream
    2. The event with the last processed ID may already exist in the TriggerStateTable table. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add an update or insert clause as follows.

      from ExtractedDataStream

      update or insert into TriggerStateTable
      on TriggerStateTable.key == "lastProcessedId";

       

    3. When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the lastProcessedID attribute specified as the key.

      from ExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId
      update or insert into TriggerStateTable
      on TriggerStateTable.key == "lastProcessedId";

    4. The completed query looks as follows. 

      @info(name='UpdateLastProcessedId')
      from ExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId
      update or insert into TriggerStateTable
      on TriggerStateTable.key == "lastProcessedId";

  8. Now let's perform a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query to carry out this analysis.
    1. The information to be analyzed is taken from the  ExtractedDataStream stream, and the results are inserted into an output stream named TransformedDataStream. Therefore, let's add the from and insert into clauses acoordingly.

      from ExtractedDataStream

      insert into TransformedDataStream

       

    2. As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.

      from ExtractedDataStream#window.timeBatch(5 min)

      insert into TransformedDataStream


    3. The attributes in the ExtractedDataStream stream are transactionID transactionLocation and transactionAmount. The values for the  transactionLocation attribute needs to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for the transactionAmount attribute. This can be achieved via the sum() and avg() Siddhi functions. Therefore, let's add the select clause as follows.

      from ExtractedDataStream#window.timeBatch(5 min)
      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      insert into TranformedDataStream;

    4. The analysis is carried out to evaluate the sales by location. Therefore, let's add a group by clause to group the calculations by the location when presenting the results.

      from ExtractedDataStream#window.timeBatch(5 min)
      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      group by transactionLocation
      insert into TranformedDataStream;

    5. Now let's name the query and complete it as follows.

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream#window.timeBatch(5 min)
      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      group by transactionLocation
      insert into TranformedDataStream;
  9. The processed information needs to be published via Kafka. Therefore, let's add a Kafka sink as shown below.

    -- Sink to load the data
    @sink(type='kafka', bootstrap.servers='...',
    topic='...',is.binary.message='...',
    @map(type='xml'))

    This sink publishes data from the TransformedDataStream stream that is added as the output stream to direct the data processed by the StatefulAggregation Siddhi query. Therefore, let's connect the sink to that stream by adding the stream definition below it as shown below.

    -- Sink to load the data
    @sink(type='kafka', bootstrap.servers='...',

    topic='...',is.binary.message='...',
    @map(type='xml'))
    define stream TranformedDataStream(transactionLocation string,
    totalTransactionAmount double, avgTransactionAmount double,
    transactionId long);

Now let's name the Siddhi application and complete it. The completed version looks as follows.

@App:name('DB-to-Kafka-ETL')
@App:description('Extract data from database, perform stateful transformation, and load data to kafka')

define trigger TriggerStream at every 5 min;
-- Sink to load the data
@sink(type='kafka', bootstrap.servers='...', 
   topic='...',is.binary.message='...', 
   @map(type='xml'))
define stream TranformedDataStream(transactionLocation string, 
      totalTransactionAmount double, avgTransactionAmount double, 
      transactionId long);

-- Store table
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define table TransactionTable(transactionId long, userId long, 
              transactionAmount double, transactionLocation string);

-- In-memory Table to keep last processed transaction ID
@primaryKey('key')
define table TriggerStateTable (key string, lastProcessedId long);

@info(name = 'CollectLastProcessedId')
from TriggerStream as s right outer join TriggerStateTable as t
select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) 
             as lastProcessedId
insert into DataRetrievalStream;

@info(name = 'ExtractData')
from DataRetrievalStream as s join TransactionTable as t
   on s.lastProcessedId < t.transactionId
select t.transactionId, t.transactionAmount, t.transactionLocation
insert into ExtractedDataStream;

@info(name='UpdateLastProcessedId')
from ExtractedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
update or insert into TriggerStateTable
   on TriggerStateTable.key == "lastProcessedId";

@info(name = 'StatefulAggregation')
from ExtractedDataStream#window.timeBatch(5 min)
select transactionLocation, 
       sum(transactionAmount) as totalTransactionAmount, 
       avg(transactionAmount) as avgTransactionAmount, 
       transactionId
group by transactionLocation
insert into TranformedDataStream;
  • No labels