spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhuai <...@git.apache.org>
Subject [GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Date Tue, 15 Nov 2016 23:54:07 GMT
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15703#discussion_r88140713
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    +  // Hive `ObjectInspector`s for all child expressions (input parameters of the function).
       @transient
    -  private lazy val inspectors = children.map(toInspector).toArray
    +  private lazy val inputInspectors = children.map(toInspector).toArray
     
    +  // Spark SQL data types of input parameters.
       @transient
    -  private lazy val functionAndInspector = {
    -    val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    val f = resolver.getEvaluator(parameterInfo)
    -    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  private def newEvaluator(): GenericUDAFEvaluator = {
    +    val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
    +    resolver.getEvaluator(parameterInfo)
       }
     
    +  // The UDAF evaluator used to consume raw input rows and produce partial aggregation
results.
       @transient
    -  private lazy val function = functionAndInspector._1
    +  private lazy val partial1ModeEvaluator = newEvaluator()
     
    +  // Hive `ObjectInspector` used to inspect partial aggregation results.
       @transient
    -  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
    +  private val partialResultInspector = partial1ModeEvaluator.init(
    +    GenericUDAFEvaluator.Mode.PARTIAL1,
    +    inputInspectors
    +  )
     
    +  // The UDAF evaluator used to merge partial aggregation results.
       @transient
    -  private lazy val returnInspector = functionAndInspector._2
    +  private lazy val partial2ModeEvaluator = {
    +    val evaluator = newEvaluator()
    +    evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partialResultInspector))
    +    evaluator
    +  }
     
    +  // Spark SQL data type of partial aggregation results
       @transient
    -  private lazy val unwrapper = unwrapperFor(returnInspector)
    +  private lazy val partialResultDataType = inspectorToDataType(partialResultInspector)
     
    +  // The UDAF evaluator used to compute the final result from a partial aggregation result
objects.
       @transient
    -  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
    -
    -  override def eval(input: InternalRow): Any = unwrapper(function.evaluate(buffer))
    +  private lazy val finalModeEvaluator = newEvaluator()
     
    +  // Hive `ObjectInspector` used to inspect the final aggregation result object.
       @transient
    -  private lazy val inputProjection = new InterpretedProjection(children)
    +  private val returnInspector = finalModeEvaluator.init(
    +    GenericUDAFEvaluator.Mode.FINAL,
    +    Array(partialResultInspector)
    +  )
     
    +  // Wrapper functions used to wrap Spark SQL input arguments into Hive specific format.
       @transient
    -  private lazy val cached = new Array[AnyRef](children.length)
    +  private lazy val inputWrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
     
    +  // Unwrapper function used to unwrap final aggregation result objects returned by Hive
UDAFs into
    +  // Spark SQL specific format.
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    -
    -  // Hive UDAF has its own buffer, so we don't need to occupy a slot in the aggregation
    -  // buffer for it.
    -  override def aggBufferSchema: StructType = StructType(Nil)
    -
    -  override def update(_buffer: InternalRow, input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    -    function.iterate(buffer, wrap(inputs, wrappers, cached, inputDataTypes))
    -  }
    -
    -  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
    -    throw new UnsupportedOperationException(
    -      "Hive UDAF doesn't support partial aggregate")
    -  }
    +  private lazy val resultUnwrapper = unwrapperFor(returnInspector)
     
    -  override def initialize(_buffer: InternalRow): Unit = {
    -    buffer = function.getNewAggregationBuffer
    -  }
    -
    -  override val aggBufferAttributes: Seq[AttributeReference] = Nil
    +  @transient
    +  private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not
be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] = Nil
    +  @transient
    +  private lazy val aggBufferSerDe: AggregationBufferSerDe = new AggregationBufferSerDe
     
       // We rely on Hive to check the input data types, so use `AnyDataType` here to bypass
our
       // catalyst type checking framework.
       override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType)
     
       override def nullable: Boolean = true
     
    -  override def supportsPartial: Boolean = false
    +  override def supportsPartial: Boolean = true
    --- End diff --
    
    Is there any Hive UDAF that does not support partial aggregation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message