This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

In the previous tutorial, we looked at how static machine learning and prediction can be done via PMML. In this tutorial, let's see how real-time online machine learning support is offered in Stream Processor through various mechanisms.

After the production of each sweet in the Sweet Factory, its density and volume are measured and sent to WSO2 SP. Based on this data, WSO2 SP needs to predict what the produced sweet is out of 5 possibilities.

This tutorial covers the following concepts:

A Hoeffding Classifier model is trained using real-time data to get predictions.

The other types of algorithms supported are MRulesRegressor, ClusTree, K-Means, and PerceptronClassifier.

Before you begin:

The siddhi-gpl-execution-streamingml extension needs to be added to <SP_HOME>/lib directory.

Tutorial steps

Let's get started!

  1. The input data captured for the training phase must include the name of the sweet, its density and volume. Let's define an input stream as shown below to capture this information.

    define stream ProductionTrainingStream (density double, volume double, sweetType string );
  2. You need another input stream through which the volume and the density of the unknown sweet must be sent during the predicting phase. Let's define it as follows.

    define stream SweetProductionStream (density double, volume double);
  3. Finally, let's define an output stream to present the output of the predictions.

    define stream PredictionStream (density double, volume double, prediction string, confidenceLevel double);
  4. Now, let's write a simple query to select values from the trainer stream and write them to a temporary stream.

    from ProductionTrainingStream
    select *
    insert into TemporaryStream;

    Here, the TemporaryStream stream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.

  5. Now, let's update the query to enable it to train the Hoeffding tree. This can be achieved by specifying the #streamingml annotation to the input stream, and setting its type to HoeffdingTree. Here, we can either specify whether to train the tree (updateHoeffdingTree) or to produce an output based on the learnt model (hoeffdingTreeClassifier).

    from ProductionTrainingStream#streamingml:updateHoeffdingTree()
    select *
    insert into TemporaryStream;

    Here, the TemporaryStream stream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.

  6. You need to add two parameters to the updateHoeffdingTree directive for the following.
    • The name of the model that is built or updated.
    • The number of classes in the predicted output. e.g., if the training data can contain five types of sweets, the value for this parameter must be 5 in order to indicate that there are five possible values for the attribute that is being predicted.


    In addition, you can also specify the attributes in the input stream that are needed for the training operation. e.g., If you are using the attributes X, Y, and Z from the stream to train the model so that it can predict the value for the K attribute, you must also include the X, Y, Z, and K attributes in the trainer stream definition. The last attribute is considered as the attribute for which the prediction is needed.


    Taking the above into account, let's update the trainer stream definition as follows.

    from ProductionTrainingStream#streamingml:updateHoeffdingTree( 'SweetTypeModel', 5, density, volume, sweetType )
    select *
    insert into TemporaryStream;

    Now that the annotation is complete, the events going through to the TemporaryStream stream contain the accuracy evaluation of the model. This is not needed in the end result, and therefore, you can omit it.


  7. Now let's consider the other end of the prediction process where the model makes predictions based on the density and volume. Let's write a simple query to select values from the prediction stream and direct them to the output stream.

    from SweetProductionStream
    select density, volume, prediction, confidenceLevel
    insert into PredictionStream;
  8. Let's also define the second #streamingml annotation to the SweetProductionStream stream to train the model based on events arriving (as you previously did in step 5).

    from SweetProductionStream#streamingml select density, volume, prediction, confidenceLevel
    insert into PredictionStream;

  9. This time, we need to use the classifier to perform a prediction instead of training the tree. Therefore, let's add the  hoeffdingTreeClassifier annotation instead of the  updateHoeffdingTree annotation you previously added.

    from SweetProductionStream#streamingml:hoeffdingTreeClassifier()
    select density, volume, prediction, confidenceLevel
    insert into PredictionStream;

  10. Let's specify the following parameters for the model.

    • The name of the model to which you want to refer. You can refer to the SweetTypeModel model that you previously trained.
    • The stream attributes to be used for querying the model for predictions. In this scenario, you need to query by the density and the volume of the unknown sweet.

    from SweetProductionStream#streamingml:hoeffdingTreeClassifier('SweetTypeModel', density, volume)
    select density, volume, prediction, confidenceLevel
    insert into PredictionStream;

    Here, the confidenceLevel attribute is a standard returnable from the classifier. It indicates he extent to which the generated prediction can be accurate.

The final query (with source and sink configurations added) looks as follows.

@App:name('SugerSyrupPredictionApp')

@source(type = 'http', @map(type = 'json'))
define stream ProductionTrainingStream (density double, volume double, sweetType string );

@source(type = 'http', @map(type = 'json'))
define stream SweetProductionStream (density double, volume double);

@sink(type='log', prefix='Predicted sweet type:')
define stream PredictionStream (density double, volume double, prediction string, confidenceLevel double);

@info(name = 'training-query')
from ProductionTrainingStream#streamingml:updateHoeffdingTree('SweetTypeModel', 5, density, volume, sweetType)
select *
insert into TemporaryStream;

@info(name = 'prediction-query')
from SweetProductionStream#streamingml:hoeffdingTreeClassifier('SweetTypeModel', density, volume)
select density, volume, prediction, confidenceLevel
insert into PredictionStream;
  • No labels