Description
org.apache.spark.sql.expressions.Aggregator currently requires defining the zero value for an aggregator. This is actually a limitation making it difficult to implement APIs such as reduce. In reduce (or reduceByKey), a single associative and commutative reduce function is specified by the user, and there is no definition of zero value.
A small tweak to the API is to change zero to init, taking an input, similar to the following:
abstract class Aggregator[-IN, BUF, OUT] extends Serializable { def init(a: IN): BUF def reduce(b: BUF, a: IN): BUF def merge(b1: BUF, b2: BUF): BUF def finish(reduction: BUF): OUT }
Then reduce can be implemented using:
f: (T, T) => T
new Aggregator[T, T, T] {
override def init(a: T): T = identify
override def reduce(b: T, a: T): T = f(b, a)
override def merge(b1: T, b2: T): T = f(b1, b2)
override def finish(reduction: T): T = identify
}