spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #14562: [SPARK-16973][SQL] remove the buffer offsets in I...
Date Tue, 09 Aug 2016 13:20:54 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14562#discussion_r74055718
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
---
    @@ -219,111 +219,118 @@ sealed abstract class AggregateFunction extends Expression with
ImplicitCastInpu
     }
     
     /**
    - * API for aggregation functions that are expressed in terms of imperative initialize(),
update(),
    - * and merge() functions which operate on Row-based aggregation buffers.
    - *
    - * Within these functions, code should access fields of the mutable aggregation buffer
by adding the
    - * bufferSchema-relative field number to `mutableAggBufferOffset` then using this new
field number
    - * to access the buffer Row. This is necessary because this aggregation function's buffer
is
    - * embedded inside of a larger shared aggregation buffer when an aggregation operator
evaluates
    - * multiple aggregate functions at the same time.
    - *
    - * We need to perform similar field number arithmetic when merging multiple intermediate
    - * aggregate buffers together in `merge()` (in this case, use `inputAggBufferOffset`
when accessing
    - * the input buffer).
    - *
    - * Correct ImperativeAggregate evaluation depends on the correctness of `mutableAggBufferOffset`
and
    - * `inputAggBufferOffset`, but not on the correctness of the attribute ids in `aggBufferAttributes`
    - * and `inputAggBufferAttributes`.
    + * API for aggregation functions that are expressed in terms of imperative doInitialize(),
    + * doUpdate(), doMerge() and doComplete() functions which operate on Row-based aggregation
buffers.
      */
     abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback {
     
       /**
    -   * The offset of this function's first buffer value in the underlying shared mutable
aggregation
    -   * buffer.
    +   * The aggregation operator keeps a large shared mutable buffer row for all aggregate
functions,
    +   * each aggregate function should only access a slice of this shared buffer.
    +   */
    +  private var mutableBufferRow: SlicedMutableRow = _
    +
    +  /**
    +   * During partial aggregation, the input buffer row to be merged is shared among all
aggregate
    +   * functions, each aggregate function should only access a slice of this input buffer.
    +   */
    +  private var inputBufferRow: SlicedInternalRow = _
    +
    +  /**
    +   * Set the offset of this function's start buffer value in the underlying shared mutable
    +   * aggregation buffer.
        *
        * For example, we have two aggregate functions `avg(x)` and `avg(y)`, which share
the same
    -   * aggregation buffer. In this shared buffer, the position of the first buffer value
of `avg(x)`
    -   * will be 0 and the position of the first buffer value of `avg(y)` will be 2:
    +   * aggregation buffer. In this shared buffer, the position of the start buffer value
of `avg(x)`
    +   * will be 0 and the position of the start buffer value of `avg(y)` will be 2:
        * {{{
    -   *          avg(x) mutableAggBufferOffset = 0
    +   *          avg(x) mutable buffer offset is 0
        *                  |
        *                  v
        *                  +--------+--------+--------+--------+
        *                  |  sum1  | count1 |  sum2  | count2 |
        *                  +--------+--------+--------+--------+
        *                                    ^
        *                                    |
    -   *                     avg(y) mutableAggBufferOffset = 2
    +   *                     avg(y) mutable buffer offset is 2
        * }}}
        */
    -  protected val mutableAggBufferOffset: Int
    -
    -  /**
    -   * Returns a copy of this ImperativeAggregate with an updated mutableAggBufferOffset.
    -   * This new copy's attributes may have different ids than the original.
    -   */
    -  def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate
    +  final def setMutableBufferOffset(offset: Int): Unit = {
    +    assert(mutableBufferRow == null)
    +    mutableBufferRow = SlicedMutableRow(offset, aggBufferAttributes.length)
    +  }
     
       /**
    -   * The offset of this function's start buffer value in the underlying shared input
aggregation
    +   * Set the offset of this function's start buffer value in the underlying shared input
aggregation
        * buffer. An input aggregation buffer is used when we merge two aggregation buffers
together in
    -   * the `update()` function and is immutable (we merge an input aggregation buffer and
a mutable
    +   * the `merge()` function and is immutable (we merge an input aggregation buffer and
a mutable
        * aggregation buffer and then store the new buffer values to the mutable aggregation
buffer).
        *
        * An input aggregation buffer may contain extra fields, such as grouping keys, at
its start, so
    -   * mutableAggBufferOffset and inputAggBufferOffset are often different.
    +   * mutable buffer offset and input buffer offset are often different.
        *
        * For example, say we have a grouping expression, `key`, and two aggregate functions,
    -   * `avg(x)` and `avg(y)`. In the shared input aggregation buffer, the position of the
first
    -   * buffer value of `avg(x)` will be 1 and the position of the first buffer value of
`avg(y)`
    +   * `avg(x)` and `avg(y)`. In the shared input aggregation buffer, the position of the
start
    +   * buffer value of `avg(x)` will be 1 and the position of the start buffer value of
`avg(y)`
        * will be 3 (position 0 is used for the value of `key`):
        * {{{
    -   *          avg(x) inputAggBufferOffset = 1
    +   *          avg(x) input buffer offset is 1
        *                   |
        *                   v
        *          +--------+--------+--------+--------+--------+
        *          |  key   |  sum1  | count1 |  sum2  | count2 |
        *          +--------+--------+--------+--------+--------+
        *                                     ^
        *                                     |
    -   *                       avg(y) inputAggBufferOffset = 3
    +   *                       avg(y) input buffer offset is 3
        * }}}
        */
    -  protected val inputAggBufferOffset: Int
    +  final def setInputBufferOffset(offset: Int): Unit = {
    +    assert(inputBufferRow == null)
    --- End diff --
    
    This is based on my understanding. I'll figure it out tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message