flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sunjincheng (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-6074) Fix processFunction with watermark not work well in tableAPI
Date Fri, 17 Mar 2017 01:06:41 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

sunjincheng updated FLINK-6074:
-------------------------------
    Component/s: Table API & SQL
                 DataStream API

> Fix processFunction with watermark not work well in tableAPI
> ------------------------------------------------------------
>
>                 Key: FLINK-6074
>                 URL: https://issues.apache.org/jira/browse/FLINK-6074
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> I did a simple test and found that the same `AssignerWithPunctuatedWatermarks` and` ProcessFunction`,
using sqlAPI and using `DataStreamAPI` get a different watermark, as follows:
> ProcessFunction:
> {code}
> class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, String)]
{
>     override def processElement(value: (Long, Int, String),
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, String)]#Context,
>         out: Collector[(Long, Int, String)]): Unit = {
>         println("WaterMark=" + ctx.timerService.currentWatermark())
>     }
>     override def onTimer(
>         timestamp: Long,
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, String)]#OnTimerContext,
>         out: Collector[(Long, Int, String)]): Unit = ???
>     }
> {code}
> AssignerWithPunctuatedWatermarks:
> {code}
> class TimestampWithLatenessWatermark extends AssignerWithPunctuatedWatermarks[(Long,
>     Int, String)] {
>     override def checkAndGetNextWatermark(
>       lastElement: (Long, Int, String),
>       extractedTimestamp: Long)
>     : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>       element: (Long, Int, String),
>       previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}
> TestDATA:
> {code}
> val data = List(
>     (1L, 1, "Hello"),
>     (2L, 2, "Hello"),
>     (3L, 3, "Hello"),
>     (4L, 4, "Hello"),
>     (5L, 5, "Hello"),
>     (6L, 6, "Hello"),
>     (7L, 7, "Hello World"),
>     (8L, 8, "Hello World"),
>     (20L, 20, "Hello World"))
> {code}
> DataStreamAPI:
> {code}
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val src = env.fromCollection(data).assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark())
>   src.keyBy(1).process(new TriggeringFlatMapFunction())
> Print:
> WaterMark=-9223372036854775808
> WaterMark=3
> WaterMark=5
> WaterMark=7
> WaterMark=1
> WaterMark=2
> WaterMark=4
> WaterMark=8
> WaterMark=6
> {code}
> SqlAPI:
> {code}
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark())
>     val tab = src.toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("T1", tab)
>        val sqlQuery = "SELECT " +
>       "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT
ROW)" +
>       "from T1"
>     val result = tEnv.sql(sqlQuery).toDataStream[Row]
> Print:
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=2
> WaterMark=6
> {code}
> I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct If there
any incorrect usage? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message