spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From deenar <deenar.toras...@thinkreactive.co.uk>
Subject Re: Spark 1.6.1
Date Fri, 29 Jan 2016 20:16:01 GMT
Hi Michael

The Dataset aggregators do not appear to support complex Spark-SQL types. I
wasn't sure if I was doing something wrong or if this was a bug or a feature
not implemented yet. Having this in would be great. See below (reposting
this from the spark user list)

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
Aggregators. I have an ArraySum aggregator that does an element wise sum or
arrays. I have got the simple version working, but the Generic version fails
with the following error, not sure what I am doing wrong.

scala> import sqlContext.implicits._

scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
= new GenericArraySumAggregator(f).toColumn

<console>:34: error: Unable to find encoder for type stored in a Dataset. 
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing sqlContext.implicits._  Support for serializing other
types will be added in future releases.

         def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
N] = new GenericArraySumAggregator(f).toColumn

                                                                                         
                      
^

object ArraySumAggregator extends  Aggregator[Seq[Float], Seq[Float],
Seq[Float]] with Serializable {
  def zero: Seq[Float] = Nil
  // The initial value.
  def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
sumArray(currentSum, currentRow)
  def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
  def finish(b: Seq[Float]) = b // Return the final result.
  def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
    (a, b) match {
      case (Nil, Nil) => Nil
      case (Nil, row) => row
      case (sum, Nil) => sum
      case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
    }
  }
}
class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
  val numeric = implicitly[Numeric[N]]
  override def zero: Seq[N] = Nil
  override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x
=> f(x))) //numeric.plus(b, f(a))
  override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
  override def finish(reduction: Seq[N]): Seq[N] = reduction
  def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
    (a, b) match {
      case (Nil, Nil) => Nil
      case (Nil, row) => row
      case (sum, Nil) => sum
      case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a,
b) }
    }
  }
}



Regards
Deenar



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-1-tp16009p16155.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message