flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6858) Unbounded event time Over Window emits incorrect timestamps
Date Tue, 06 Jun 2017 17:00:21 GMT
Fabian Hueske created FLINK-6858:
------------------------------------

             Summary: Unbounded event time Over Window emits incorrect timestamps
                 Key: FLINK-6858
                 URL: https://issues.apache.org/jira/browse/FLINK-6858
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.3.0
            Reporter: Fabian Hueske
            Priority: Critical


The unbounded event time OVER windows emit records with incorrect timestamps.

OVER aggregates "enrich" each input row with aggregates computed over neighboring rows, i.e.,
they produce one output row for each input row. The (event-time) timestamp of each input row
should be preserved and not modified.

All OVER window aggregates are implemented using the {{ProcessFunction}} interface. The interface
has two methods {{processElement()}} and {{onTimer()}} that can produce output records. Records
emitted by {{processElement()}} are emitted with the timestamp of the record that was given
as an argument to the method. Records emitted by {{onTimer()}} are emitted with the timestamp
of the timer that triggered the call of the method.

The implementation of the unbounded event-time OVER window registers a new new timer when
{{processElement()}} is called for {{currentWatermark + 1}}. When the timer triggers, the
{{onTimer()}} processes all rows that where received between this and the last {{onTimer()}}
call with timestamps smaller than the current watermark. However, this means that all emitted
rows have a timestamp of {{currentWatermark + 1}} which is not what we want.

The bounded event-time OVER window operators follow a different strategy and register a timer
for the timestamp of each row that was processed by {{processElement()}} and emit the corresponding
rows when {{onTimer()}} is called. Hence, they emit the rows with correct timestamps.

I think we should change the implementation of the unbounded event-time OVER aggregates to
a similar strategy as the bounded event-time OVER aggregates.

What do you think [~Yuhong_kyo] [~sunjincheng121]?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message