flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3474] support partial aggregate
Date Tue, 01 Mar 2016 13:19:05 GMT
Github user fhueske commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
    @@ -17,26 +17,77 @@
     package org.apache.flink.api.table.runtime.aggregate
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    - * Represents a SQL aggregate function. The user should first initialize the aggregate,
then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate.
To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence
of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
     trait Aggregate[T] extends Serializable {
    +  protected var aggOffsetInRow: Int = _
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
    --- End diff --
    I did not find a usage of the `initiate()` method. However, I think we can use it to initialize
the buffer to make the `reduce()` and `combine()` methods safe wrt. object reuse.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message