flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
Date Thu, 30 Nov 2017 15:33:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272808#comment-16272808
] 

ASF GitHub Bot commented on FLINK-8158:
---------------------------------------

Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5094
  
    Yes, the records emitted in your test are late. But the reason for that is that also records
that you give into the join are already late. You set the watermarks on both input to `6000`
and subsequently insert late data.
    
    If you change the test to insert records such that they are not late:
    
    ```
    testHarness.processElement1(new StreamRecord[CRow](
      CRow(Row.of(1000L: JLong, "k1"), true), 1000))
    
    testHarness.processWatermark1(new Watermark(5999))
    testHarness.processWatermark2(new Watermark(5999))
    
    testHarness.processElement1(new StreamRecord[CRow](
      CRow(Row.of(6000L: JLong, "k1"), true), 6000))
    testHarness.processElement2(new StreamRecord[CRow](
      CRow(Row.of(6000L: JLong, "k1"), true), 6000))
    ```
    
    the result records are not late as well. 
    
    If you change the watemarks to `6000`, the 2nd and 3rd records are received as late data
(because their timestamp is equal to the watermark time) and therefore the output is late
as well.
    
    I think the current logic is correct.


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>         Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late data is emitted.
Currently, this achieved by holding back watermarks. However, the window border is not handled
correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with another data
("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous
watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message