flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuan,Youjun" <yuanyou...@baidu.com>
Subject 答复: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题
Date Mon, 12 Aug 2019 05:45:18 GMT
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。
当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window
end,但是window还没fire的情况。


-----邮件原件-----
发件人: Ever <439674061@qq.com> 
发送时间: Sunday, July 14, 2019 5:00 PM
收件人: user-zh <user-zh@flink.apache.org>
主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

第四条数据来的时间戳是: 03:17:55,  水印时间这时候应该是03:17:50,  不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间,
都已经过了, 都应该关闭了啊




------------------ 原始邮件 ------------------
发件人: "Hequn Cheng"<chenghequn@gmail.com>;
发送时间: 2019年7月14日(星期天) 中午11:55
收件人: "user-zh"<user-zh@flink.apache.org>;

主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题



Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <439674061@qq.com> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seco
> nds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper): 
> Long = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => { // ##2map
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-s
> ink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤,
然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?
Mime
View raw message