flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: 对flink源码中watermark对齐逻辑的疑惑
Date Mon, 11 May 2020 13:32:37 GMT
Hi

正是因为取各个input channel的最小值,所以如果某一个上游一直没有获取到真实数据,发送下来的watermark一直都是Long.MIN_VALUE,这样会导致无法触发window,社区采用idle
source [1]的方式walk around该问题

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources

祝好
唐云
________________________________
From: Benchao Li <libenchao@gmail.com>
Sent: Monday, May 11, 2020 16:12
To: user-zh <user-zh@flink.apache.org>
Subject: Re: 对flink源码中watermark对齐逻辑的疑惑

Hi,
我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。

1193216154 <1193216154@qq.com> 于2020年5月11日周一 下午3:17写道:

> 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下
>
> public void inputWatermark(Watermark watermark, int channelIndex) {
>                 // ignore the input watermark if its input channel, or all
> input channels are idle (i.e. overall the valve is idle).
>                 if (lastOutputStreamStatus.isActive() &amp;&amp;
> channelStatuses[channelIndex].streamStatus.isActive()) {
>                         long watermarkMillis = watermark.getTimestamp();
>
>
>                         // if the input watermark's value is less than the
> last received watermark for its input channel, ignore it also.
>                         if (watermarkMillis &gt;
> channelStatuses[channelIndex].watermark) {
>                                 channelStatuses[channelIndex].watermark =
> watermarkMillis;
>
>
>                                 // previously unaligned input channels are
> now aligned if its watermark has caught up
>                                 if
> (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp;
> watermarkMillis &gt;= lastOutputWatermark) {
>
> channelStatuses[channelIndex].isWatermarkAligned = true;
>                                 }
>
>
>                                 // now, attempt to find a new min
> watermark across all aligned channels
>
> findAndOutputNewMinWatermarkAcrossAlignedChannels();
>                         }
>                 }
>         }
>
>
>
> private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
>                 long newMinWatermark = Long.MAX_VALUE;
>                 boolean hasAlignedChannels = false;
>
>
>                 // determine new overall watermark by considering only
> watermark-aligned channels across all channels
>                 for (InputChannelStatus channelStatus : channelStatuses) {
>                         if (channelStatus.isWatermarkAligned) {
>                                 hasAlignedChannels = true;
>                                 newMinWatermark =
> Math.min(channelStatus.watermark, newMinWatermark);
>                         }
>                 }
>
>
>                 // we acknowledge and output the new overall watermark if
> it really is aggregated
>                 // from some remaining aligned channel, and is also larger
> than the last output watermark
>                 if (hasAlignedChannels &amp;&amp; newMinWatermark &gt;
> lastOutputWatermark) {
>                         lastOutputWatermark = newMinWatermark;
>                         outputHandler.handleWatermark(new
> Watermark(lastOutputWatermark));
>                 }
>         }
>
>
>
> 这段代码中好像并没有多个&nbsp; channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同
> channelIndex 到来的watermark做一个取最小值的逻辑吗?



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message