WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Introduction

This sample demonstrates how you can send notifications through events published from WSO2 DAS using Apache Spark. The notifications are sent to alert about records of an existing table in the Spark environment satisfying a defined condition(s). This sample involves creating a table with a few product names and quantities in the DAL, and sending notifications when the quantity of a product falls below a defined value.  

Prerequisites

Set up the general prerequisites required for WSO2 DAS.

Building the sample

Follow the steps below to build the sample.

Creating the receiving event stream and the table in DAL

Follow the steps below to create a table named PRODUCTS together with the receiving event stream in the Data Access Layer (DAL).

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Streams.
  3. Click Add Event Stream.
  4. Enter the values as shown below to create an event stream named PRODUCTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Understanding Event Streams and Event Tables.
    create the receiving event stream
  5. Click Next (Persist Event).
  6. Enter the values as shown below in the next screen to persist the created event stream. For more information on creating event streams, see Persisting  Event Streams.
    persisting the created event stream
  7. Click Save Event Stream.

Sending events to the receiving event stream

Follow the steps below to simulate the sending of events to the created receiving event stream.

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Tools, and then click Event Simulator.
  3. Upload the events to be sent to it in a CSV file (e.g. footwear.csv), and click Configure as shown below.
    simulating of sending events to the receiving event stream
  4. Enter the details as shown below, and click Configure. 
    entering the event mapping configurations
  5. Click Play in the next screen as shown below.
    play events to simulate the sending of events
  6. Click Main, and then click Data Explorer. Select the name of the table to view it as shown below.

Creating the corresponding table in Apache Spark

Follow the steps below to create a virtual table in the Apache Spark environment within WSO2 DAS to map the PRODUCTS_STREAM table in the DAL.

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Console.
  3. Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.

    CREATE TEMPORARY TABLE PRODUCTS_MASTER 
    USING CarbonAnalytics 
    OPTIONS (tableName "PRODUCTS_STREAM", 
             schema "product-name STRING,quantity INT");

Creating the event stream to publish event from Spark

For publishing events from Spark, you have to define an event stream with given stream attributes that will be published from Spark.

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Streams.
  3. Click Add Event Stream.
  4. Enter the values as shown below to create an event stream named PRODUCT_ALERTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Event Streams.
  5. Click Add Event Stream.

Creating the event receiver for event stream

Once you define the event stream, you need to create an event receiver of the WSO2Event type to receive events from Spark. For more information, see Configuring Event Receivers.

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Receivers.
  3. Click Add Event Receiver.
  4. Enter the values as shown below to create an event receiver of the WSO2Event type for above PRODUCT_ALERTS_STREAM. For more information on creating event streams, see Event Streams.
  5. Click Add Event Receiver

Configuring a publisher

You can attach an event publisher such as email or JMS to the PRODUCT_ALERTS_STREAM, and get the events delivered to a preferred location. Follow the steps below to configure a publisher to publish output events from WSO2 DAS, and to send notifications as logs of the terminal using a logger publisher. For more information on configuring event publishers, see Creating Alerts.

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click  Main , and then click Publishers.
  3. Enter the details as shown below to configure the publisher. 
    configuring the event publisher
  4. Click Add Event Publisher. 

Executing the sample

Follow the steps below to execute the sample.

Creating the Spark table which maps to the created event stream

  Follow the steps below to create the corresponding virtual table named PRODUCT_ALERTS in the Apache Spark environment within WSO2 DAS to maps with the created event stream PRODUCT_ALERTS_STREAM.

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Console.
  3. Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.

    CREATE TEMPORARY TABLE PRODUCT_ALERTS 
    USING org.wso2.carbon.analytics.spark.event.EventStreamProvider 
    OPTIONS (streamName "PRODUCT_ALERTS_STREAM", version "1.0.0",
             payload "product-name STRING, quantity INT"
    );

    Note

    The EventStreamProvider works in a way that, it writes event data to a central analytics table, and the event data is picked up by a scheduled task in the receiver nodes, and these events are dispatched to the streams from there. The scheduled task polls the source analytics table in 10 second intervals, to check if there are any event data pending. If required, this scheduled task can be disabled explicitly in a node, by setting the Java system property "disableSparkEventingTask" to "true" (e.g. ./wso2server.sh|bat -DdisableSparkEventingTask=true).

Publishing events to the created event stream

Follow the steps below to publish output events to the created event stream.

  1.  Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Console.
  3. Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.

    INSERT OVERWRITE TABLE PRODUCT_ALERTS  
    select * from PRODUCTS_MASTER
    where quantity<5;

When this query executes, output of the select query is inserted into the PRODUCT_ALERTS table. It  reads all the products from the PRODUCTS_STREAM Spark table which have its quantity less than 50. During the query execution, individual rows returned from the select query are published into the PRODUCT_ALERTS stream as events.

  1. You view the text events that are published to the CEP server in the logs of it in the CLI as shown below.
    output logs of the logger publisher
  • No labels