flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sunjincheng (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6074) Fix processFunction with watermark not work well in tableAPI
Date Sun, 19 Mar 2017 04:52:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931569#comment-15931569
] 

sunjincheng commented on FLINK-6074:
------------------------------------

When I using more test data,I find the cause of the above different behavior is the default
setting of 4 DEFAULT_PARALLELISM in `StreamingMultipleProgramsTestBase`. e.g.:
TestData
{code}
val data = List(
    (1L, 1, "Hello World"),
    (2L, 2, "Hello World"),
    (3L, 3, "Hello World"),
    (4L, 4, "Hello World"),
    (5L, 5, "Hello World"),
    (6L, 6, "Hello World"),
    (7L, 7, "Hello World"),
    (8L, 8, "Hello World"),
    (9L, 8, "Hello World"),
    (10L, 8, "Hello World"),
    (11L, 8, "Hello World"),
    (12L, 8, "Hello World"),
    (13L, 8, "Hello World"),
    (14L, 8, "Hello World"),
    (15L, 8, "Hello World"),
    (16L, 8, "Hello World"),
    (17L, 8, "Hello World"),
    (18L, 8, "Hello World"),
    (19L, 8, "Hello World"),
    (20L, 8, "Hello World"),
    (21L, 8, "Hello World"),
    (22L, 8, "Hello World"),
    (23L, 8, "Hello World"),
    (24L, 8, "Hello World"),
    (25L, 8, "Hello World"),
    (26L, 8, "Hello World"),
    (27L, 8, "Hello World"),
    (28L, 20, "Hello World"))
{code}
Result:
{code}
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=4
WaterMark=8
WaterMark=12
WaterMark=16
WaterMark=20
WaterMark=24
{code}

When I set:
{code}
env.setParallelism(1)
{code}

Result:
{code}
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
WaterMark=-9223372036854775808
WaterMark=1
WaterMark=2
WaterMark=3
WaterMark=4
WaterMark=5
WaterMark=6
WaterMark=7
WaterMark=8
WaterMark=9
WaterMark=10
WaterMark=11
WaterMark=12
WaterMark=13
WaterMark=14
WaterMark=15
WaterMark=16
WaterMark=17
WaterMark=18
WaterMark=19
WaterMark=20
WaterMark=21
WaterMark=22
WaterMark=23
WaterMark=24
WaterMark=25
WaterMark=26
WaterMark=27
{code}
Then SqlAPI and DataStreamAPI have the same behavior.

> Fix processFunction with watermark not work well in tableAPI
> ------------------------------------------------------------
>
>                 Key: FLINK-6074
>                 URL: https://issues.apache.org/jira/browse/FLINK-6074
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> I did a simple test and found that the same `AssignerWithPunctuatedWatermarks` and` ProcessFunction`,
using sqlAPI and using `DataStreamAPI` get a different watermark, as follows:
> ProcessFunction:
> {code}
> class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, String)]
{
>     override def processElement(value: (Long, Int, String),
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, String)]#Context,
>         out: Collector[(Long, Int, String)]): Unit = {
>         println("WaterMark=" + ctx.timerService.currentWatermark())
>     }
>     override def onTimer(
>         timestamp: Long,
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, String)]#OnTimerContext,
>         out: Collector[(Long, Int, String)]): Unit = ???
>     }
> {code}
> AssignerWithPunctuatedWatermarks:
> {code}
> class TimestampWithLatenessWatermark extends AssignerWithPunctuatedWatermarks[(Long,
>     Int, String)] {
>     override def checkAndGetNextWatermark(
>       lastElement: (Long, Int, String),
>       extractedTimestamp: Long)
>     : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>       element: (Long, Int, String),
>       previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}
> TestDATA:
> {code}
> val data = List(
>     (1L, 1, "Hello"),
>     (2L, 2, "Hello"),
>     (3L, 3, "Hello"),
>     (4L, 4, "Hello"),
>     (5L, 5, "Hello"),
>     (6L, 6, "Hello"),
>     (7L, 7, "Hello World"),
>     (8L, 8, "Hello World"),
>     (20L, 20, "Hello World"))
> {code}
> DataStreamAPI:
> {code}
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val src = env.fromCollection(data).assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark())
>   src.keyBy(1).process(new TriggeringFlatMapFunction())
> Print:
> WaterMark=-9223372036854775808
> WaterMark=3
> WaterMark=5
> WaterMark=7
> WaterMark=1
> WaterMark=2
> WaterMark=4
> WaterMark=8
> WaterMark=6
> {code}
> SqlAPI:
> {code}
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark())
>     val tab = src.toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("T1", tab)
>        val sqlQuery = "SELECT " +
>       "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT
ROW)" +
>       "from T1"
>     val result = tEnv.sql(sqlQuery).toDataStream[Row]
> Print:
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=2
> WaterMark=6
> {code}
> I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct If there
any incorrect usage? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message