spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...
Date Sat, 11 Nov 2017 17:17:55 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150388516
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
---
    @@ -76,26 +77,130 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN,
Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long),
Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
     
       override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
       override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    +
    +  override def zero: Double = Double.MaxValue
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.MaxValue == reduction) {
    +      Double.NegativeInfinity
    +    }
    +    else {
    --- End diff --
    
    nit:
    ```
    if {
    } else {
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message