The latest version for DAS is WSO2 Data Analytics Server 3.1.0. View documentation for the latest release.
WSO2 Data Analytics Server is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.

All docs This doc
||
Skip to end of metadata
Go to start of metadata

Apache Spark UDAFs (User Defined Aggregate Functions) allow you to implement customized aggregate operations on Spark rows. Custom UDAFs can be written and added to DAS if the required functionality does not already exist in Spark.

In addition to the definition of custom Spark UDAFs, WSO2 DAS also provides an abstraction layer for generic Spark UDAF functionality that be used to introduce custom UDAFs to the DAS/Spark runtime without changes to the server configuration.

The following query is an example of a custom UDAF named geometricMean.

SELECT geometricMean(price) as meanPrice FROM products;

  • Apache Spark 1.6.2, and by extension WSO2 DAS 3.1.0, do not support primitive data type returns. Therefore, all the methods in a POJO class should return the wrapper class of the corresponding primitive data type.
  • Apache Spark 1.6.2 also does not differentiate between algebraic and non-algebraic UDAFs. Therefore, all the methods in the parent class need to be implemented for all the UDAFs. e.g., Mathematical operations such as median do not support merge operations, but they are still required.
  •  Method overloading for UDAFs is not supported. Different UDAFs should have different method names for the expected behaviour. e.g., a UDAF with the same functionality for String and Numeric types need to be defined as two separate UDAF implementations, where each handles a different type.

There are two patterns that can be followed to create a custom UDAF. Click on the relevant tab based on the pattern you want to follow.

This pattern involves creating and packaging the UDAF artifact, deploying it to the WSO2 DAS environment and then updating the Spark UDF configuration file so that WSO2 DAS recognizes the deployed UDAF at runtime.

Follow the steps below to create a custom UDAF.

Step 1: Create the POJO class

The following is an example of a UDAF POJO for the geometricMean custom UDAF class.

package com.whitesnake.analytics.udaf;

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

/**
  * This UDAF could be used to calculate the geometric mean of the given set of numbers.
  */
class GeometricMeanUDAF extends UserDefinedAggregateFunction {

  override def inputSchema: org.apache.spark.sql.types.StructType =
	StructType(StructField("value", DoubleType) :: Nil)

  override def bufferSchema: StructType = StructType(
	StructField("count", LongType) :: StructField("product", DoubleType) :: Nil
  )

  override def dataType: DataType = DoubleType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
	buffer(0) = 0L
	buffer(1) = 1.0
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
	buffer(0) = buffer.getAs[Long](0) + 1
	buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
	buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
	buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
  }

  override def evaluate(buffer: Row): Any = {
	math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
  }

}

 

This class implements the functionality of the Spark org.apache.spark.sql.expressions.UserDefinedAggregateFunction abstract class, and therefore, the functionality of the following should be implemented.

  • inputSchema: The schema of the input rows.

  • bufferSchema: The schema of intermediate results.

  • dataType: The datatype of the final result.

  • Deterministic: This denotes whether the same inputs always produce the same results.

  • initialize(): This is called once per node for a given group.

  • update(): This is called once per input record.

  • merge(): This is called to compute partial results and combine them together.

  • evaluate(): This is called to compute the final result.

For more information on each of the above parameters, see the official Spark Javadoc on UDAFs.

Step 2: Package the class in a jar and deploy it to DAS

The custom UDAF class you created should be packaged as a jar and copied to the <DAS_HOME/repository/components/lib directory of each WSO2 DAS node in the cluster.

Step 3: Update the Spark UDF configuration file

Add the newly created custom UDAF to the <DAS_HOME>/repository/conf/analytics/spark/spark-udf-config.xml file as shown in the example below.

<udf-configuration>
	<custom-udaf-classes>
  		<custom-udaf>
      		<alias>geometricMean</alias>
      		<class-name>com.whitesnake.analytics.udaf.GeometricMeanUDAF</class-name>
    	</custom-udaf>
	</custom-udaf-classes>
</udf-configuration>

This configuration is required for Apache Spark to identify and use the newly defined custom UDAF.

The alias parameter denotes the name of the UDAF that should used within Spark analytics scripts, and the class-name parameter points to the fully-qualified name of the implementation class.

This pattern allows you to deploy UDAFs to a WSO2 DAS instance without any configuration changes.

Step 1: Create the POJO class

The following is an example of a UDAF POJO for the geometricMean custom UDAF class.

package com.whitesnake.analytics.udaf.carbon;

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.types._
import org.wso2.carbon.analytics.spark.core.udf.CarbonUDAF

/**
  * This UDAF could be used to calculate the geometric mean of the given set of numbers.
  *
  * The implementation extends the CarbonUDAF abstract class, and so could be deployed
  * to DAS through OSGi.
  */
class CarbonGeometricMeanUDAF extends CarbonUDAF {

  override def getAlias: String = "geometricMean"

  override def inputSchema: StructType =
	StructType(StructField("value", DoubleType) :: Nil)

  override def bufferSchema: StructType = StructType(
	StructField("count", LongType) :: StructField("product", DoubleType) :: Nil
  )

  override def dataType: DataType = DoubleType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
	buffer(0) = 0L
	buffer(1) = 1.0
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
	buffer(0) = buffer.getAs[Long](0) + 1
	buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
	buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
	buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
  }

  override def evaluate(buffer: Row): Any = {
	math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
  }

}

This class implements the functionality of the Spark org.apache.spark.sql.expressions.UserDefinedAggregateFunction abstract class, and therefore, the functionality of the following should be implemented.

  • inputSchema: The schema of the input rows.

  • bufferSchema: The schema of intermediate results.

  • dataType: The datatype of the final result.

  • Deterministic: This denotes whether the same inputs always produce the same results.

  • initialize(): This is called once per node for a given group.

  • update(): This is called once per input record.

  • merge(): This is called to compute partial results and combine them together.

  • evaluate(): This is called to compute the final result.
  • getAlias(): This denotes what the alias of the custom UDAF should be, at runtime. The return value of this method should be the same as the one used to call up the custom UDAF in Spark scripts.

For more information on each of the above parameters, see the official Spark Javadoc on UDAFs.


Step 2: Package the class as an OSGi bundle and deploy to DAS

Instead of deploying the UDAF class as a jar, you should package it as an OSGi bundle and copy it the <DAS_HOME/repository/components/dropins directory of each WSO2 DAS node in the cluster.

  • No labels