spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #14753: [SPARK-17187][SQL] Supports using arbitrary Java ...
Date Tue, 23 Aug 2016 02:05:33 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14753#discussion_r75792451
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
---
    @@ -389,3 +389,175 @@ abstract class DeclarativeAggregate
         def right: AttributeReference = inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
       }
     }
    +
    +/**
    + * Aggregation function which allows **arbitrary** user-defined java object to be used
as internal
    + * aggregation buffer object.
    + *
    + * {{{
    + *                aggregation buffer for normal aggregation function `avg`
    + *                    |
    + *                    v
    + *                  +--------------+---------------+-----------------------------------+
    + *                  |  sum1 (Long) | count1 (Long) | generic user-defined java objects
|
    + *                  +--------------+---------------+-----------------------------------+
    + *                                                     ^
    + *                                                     |
    + *                    Aggregation buffer object for `TypedImperativeAggregate` aggregation
function
    + * }}}
    + *
    + * Work flow (Partial mode aggregate at Mapper side, and Final mode aggregate at Reducer
side):
    + *
    + * Stage 1: Partial aggregate at Mapper side:
    + *
    + *  1. The framework calls `createAggregationBuffer(): T` to create an empty internal
aggregation
    + *     buffer object.
    + *  2. Upon each input row, the framework calls
    + *     `update(buffer: T, input: InternalRow): Unit` to update the aggregation buffer
object T.
    + *  3. After processing all rows of current group (group by key), the framework will
serialize
    + *     aggregation buffer object T to SparkSQL internally supported underlying storage
format, and
    + *     persist the serializable format to disk if needed.
    + *  4. The framework moves on to next group, until all groups have been processed.
    + *
    + * Shuffling exchange data to Reducer tasks...
    + *
    + * Stage 2: Final mode aggregate at Reducer side:
    + *
    + *  1. The framework calls `createAggregationBuffer(): T` to create an empty internal
aggregation
    + *     buffer object (type T) for merging.
    + *  2. For each aggregation output of Stage 1, The framework de-serializes the storage
    + *     format and generates one input aggregation object (type T).
    + *  3. For each input aggregation object, the framework calls `merge(buffer: T, input:
T): Unit`
    + *     to merge the input aggregation object into aggregation buffer object.
    + *  4. After processing all input aggregation objects of current group (group by key),
the framework
    + *     calls method `eval(buffer: T)` to generate the final output for this group.
    + *  5. The framework moves on to next group, until all groups have been processed.
    + */
    +abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
    +
    +  /**
    +   * Creates an empty aggregation buffer object. This is called before processing each
key group
    +   * (group by key).
    +   *
    +   * @return an aggregation buffer object
    +   */
    +  def createAggregationBuffer(): T
    +
    +  /**
    +   * In-place updates the aggregation buffer object with an input row. buffer = buffer
+ input.
    +   * This is typically called when doing Partial or Complete mode aggregation.
    +   *
    +   * @param buffer The aggregation buffer object.
    +   * @param input an input row
    +   */
    +  def update(buffer: T, input: InternalRow): Unit
    --- End diff --
    
    This assumes the buffer object type T can do in-place update, which is not always true,
e.g. `percentile_approx`, how about `def update(buffer: T, input: InternalRow): T`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message