flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hequn Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-8158) Rowtime window inner join emits late data
Date Mon, 27 Nov 2017 11:56:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Hequn Cheng updated FLINK-8158:
-------------------------------
    Description: 
When executing the join, the join operator needs to make sure that no late data is emitted.
Currently, this achieved by holding back watermarks. However, the window border is not handled
correctly. For the sql bellow: 
{quote}
    val sqlQuery =
      """
        SELECT t2.key, t2.id, t1.id
        FROM T1 as t1 join T2 as t2 ON
          t1.key = t2.key AND
          t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
            t2.rt + INTERVAL '1' SECOND
        """.stripMargin

    val data1 = new mutable.MutableList[(String, String, Long)]
    // for boundary test
    data1.+=(("A", "LEFT1", 6000L))

    val data2 = new mutable.MutableList[(String, String, Long)]
    data2.+=(("A", "RIGHT1", 6000L))
{quote}

Join will output a watermark with timestamp 1000, but if left comes with another data ("A",
"LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous watermark.

  was:
When executing the join, the join operator needs to make sure that no late data is emitted.
Currently, this achieved by holding back watermarks. However, the window border is not handled
correctly. For the sql bellow: 
{quote}
    val sqlQuery =
      """
        |SELECT t2.key, t2.id, t1.id
        |FROM T1 as t1 join T2 as t2 ON
        |  t1.key = t2.key AND
        |  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
        |    t2.rt + INTERVAL '1' SECOND
        |""".stripMargin

    val data1 = new mutable.MutableList[(String, String, Long)]
    // for boundary test
    data1.+=(("A", "LEFT1", 6000L))

    val data2 = new mutable.MutableList[(String, String, Long)]
    data2.+=(("A", "RIGHT1", 6000L))
{quote}

Join will output a watermark with timestamp 1000, but if left comes with another data ("A",
"LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous watermark.


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>
> When executing the join, the join operator needs to make sure that no late data is emitted.
Currently, this achieved by holding back watermarks. However, the window border is not handled
correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with another data
("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous
watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message