flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
Date Mon, 24 Apr 2017 20:49:06 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981891#comment-15981891
] 

ASF GitHub Bot commented on FLINK-6091:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113050568
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
---
    @@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
         genAggregations: GeneratedAggregationsFunction,
         precedingOffset: Long,
         aggregatesTypeInfo: RowTypeInfo,
    -    inputType: TypeInformation[Row])
    -  extends ProcessFunction[Row, Row]
    +    inputType: TypeInformation[CRow])
    +  extends ProcessFunction[CRow, CRow]
         with Compiler[GeneratedAggregations] {
     
       Preconditions.checkArgument(precedingOffset > 0)
     
       private var accumulatorState: ValueState[Row] = _
    -  private var rowMapState: MapState[Long, JList[Row]] = _
    -  private var output: Row = _
    +  private var rowMapState: MapState[Long, JList[CRow]] = _
    --- End diff --
    
    Same as for the other over window. Let's use `Row` for the state and extract the `Row`
from the `CRow`.
    `output` should be a `CRow`.


> Implement and turn on the retraction for aggregates
> ---------------------------------------------------
>
>                 Key: FLINK-6091
>                 URL: https://issues.apache.org/jira/browse/FLINK-6091
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for different aggregates.

> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is working under
unbounded and processing time mode. Hence, retraction is only supported for unbounded and
processing time aggregations so far. We can add more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message