Apache Spark UDAFs (User Defined Aggregate Functions) allow you to implement customized aggregate operations on Spark rows. Custom UDAFs can be written and added to DAS if the required functionality does not already exist in Spark.
In addition to the definition of custom Spark UDAFs, WSO2 DAS also provides an abstraction layer for generic Spark UDAF functionality that be used to introduce custom UDAFs to the DAS/Spark runtime without changes to the server configuration.
The following query is an example of a custom UDAF named geometricMean
.
SELECT geometricMean(price) as meanPrice FROM products;
- Apache Spark 1.6.2, and by extension WSO2 DAS 3.1.0, do not support primitive data type returns. Therefore, all the methods in a POJO class should return the wrapper class of the corresponding primitive data type.
- Apache Spark 1.6.2 also does not differentiate between algebraic and non-algebraic UDAFs. Therefore, all the methods in the parent class need to be implemented for all the UDAFs. e.g., Mathematical operations such as median do not support merge operations, but they are still required.
- Method overloading for UDAFs is not supported. Different UDAFs should have different method names for the expected behaviour. e.g., a UDAF with the same functionality for
String
andNumeric
types need to be defined as two separate UDAF implementations, where each handles a different type.
- For a list of return types supported for Apache Spark, see Spark SQL and DataFrames and Datasets Guide - Data Types.
There are two patterns that can be followed to create a custom UDAF. Click on the relevant tab based on the pattern you want to follow.
This pattern involves creating and packaging the UDAF artifact, deploying it to the WSO2 DAS environment and then updating the Spark UDF configuration file so that WSO2 DAS recognizes the deployed UDAF at runtime.
Follow the steps below to create a custom UDAF.
Step 1: Create the POJO class
The following is an example of a UDAF POJO for the geometricMean
custom UDAF class.
package com.whitesnake.analytics.udaf; import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * This UDAF could be used to calculate the geometric mean of the given set of numbers. */ class GeometricMeanUDAF extends UserDefinedAggregateFunction { override def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", DoubleType) :: Nil) override def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) override def dataType: DataType = DoubleType override def deterministic: Boolean = true override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) } override def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) } }
This class implements the functionality of the Spark org.apache.spark.sql.expressions.UserDefinedAggregateFunction
abstract class, and therefore, the functionality of the following should be implemented.
inputSchema
: The schema of the input rows.bufferSchema
: The schema of intermediate results.dataType
: The datatype of the final result.Deterministic
: This denotes whether the same inputs always produce the same results.initialize()
: This is called once per node for a given group.update()
: This is called once per input record.merge()
: This is called to compute partial results and combine them together.evaluate()
: This is called to compute the final result.
For more information on each of the above parameters, see the official Spark Javadoc on UDAFs.
Step 2: Package the class in a jar and deploy it to DAS
The custom UDAF class you created should be packaged as a jar and copied to the <DAS_HOME/repository/components/lib
directory of each WSO2 DAS node in the cluster.
Step 3: Update the Spark UDF configuration file
Add the newly created custom UDAF to the <DAS_HOME>/repository/conf/analytics/spark/spark-udf-config.xml
file as shown in the example below.
<udf-configuration> <custom-udaf-classes> <custom-udaf> <alias>geometricMean</alias> <class-name>com.whitesnake.analytics.udaf.GeometricMeanUDAF</class-name> </custom-udaf> </custom-udaf-classes> </udf-configuration>
This configuration is required for Apache Spark to identify and use the newly defined custom UDAF.
The alias
parameter denotes the name of the UDAF that should used within Spark analytics scripts, and the class-name
parameter points to the fully-qualified name of the implementation class.
This pattern allows you to deploy UDAFs to a WSO2 DAS instance without any configuration changes.
Step 1: Create the POJO class
The following is an example of a UDAF POJO for the geometricMean
custom UDAF class.
package com.whitesnake.analytics.udaf.carbon; import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.types._ import org.wso2.carbon.analytics.spark.core.udf.CarbonUDAF /** * This UDAF could be used to calculate the geometric mean of the given set of numbers. * * The implementation extends the CarbonUDAF abstract class, and so could be deployed * to DAS through OSGi. */ class CarbonGeometricMeanUDAF extends CarbonUDAF { override def getAlias: String = "geometricMean" override def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil) override def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) override def dataType: DataType = DoubleType override def deterministic: Boolean = true override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) } override def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) } }
This class implements the functionality of the Spark org.apache.spark.sql.expressions.UserDefinedAggregateFunction
abstract class, and therefore, the functionality of the following should be implemented.
inputSchema
: The schema of the input rows.bufferSchema
: The schema of intermediate results.dataType
: The datatype of the final result.Deterministic
: This denotes whether the same inputs always produce the same results.initialize()
: This is called once per node for a given group.update()
: This is called once per input record.merge()
: This is called to compute partial results and combine them together.evaluate()
: This is called to compute the final result.getAlias()
: This denotes what the alias of the custom UDAF should be, at runtime. The return value of this method should be the same as the one used to call up the custom UDAF in Spark scripts.
For more information on each of the above parameters, see the official Spark Javadoc on UDAFs.
Step 2: Package the class as an OSGi bundle and deploy to DAS
Instead of deploying the UDAF class as a jar, you should package it as an OSGi bundle and copy it the <DAS_HOME/repository/components/dropins
directory of each WSO2 DAS node in the cluster.