flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "1193216154" <1193216...@qq.com>
Subject 对flink源码中watermark对齐逻辑的疑惑
Date Mon, 11 May 2020 07:16:45 GMT
大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下

public void inputWatermark(Watermark watermark, int channelIndex) {
		// ignore the input watermark if its input channel, or all input channels are idle (i.e.
overall the valve is idle).
		if (lastOutputStreamStatus.isActive() &amp;&amp; channelStatuses[channelIndex].streamStatus.isActive())
{
			long watermarkMillis = watermark.getTimestamp();


			// if the input watermark's value is less than the last received watermark for its input
channel, ignore it also.
			if (watermarkMillis &gt; channelStatuses[channelIndex].watermark) {
				channelStatuses[channelIndex].watermark = watermarkMillis;


				// previously unaligned input channels are now aligned if its watermark has caught up
				if (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp; watermarkMillis
&gt;= lastOutputWatermark) {
					channelStatuses[channelIndex].isWatermarkAligned = true;
				}


				// now, attempt to find a new min watermark across all aligned channels
				findAndOutputNewMinWatermarkAcrossAlignedChannels();
			}
		}
	}



private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
		long newMinWatermark = Long.MAX_VALUE;
		boolean hasAlignedChannels = false;


		// determine new overall watermark by considering only watermark-aligned channels across
all channels
		for (InputChannelStatus channelStatus : channelStatuses) {
			if (channelStatus.isWatermarkAligned) {
				hasAlignedChannels = true;
				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
			}
		}


		// we acknowledge and output the new overall watermark if it really is aggregated
		// from some remaining aligned channel, and is also larger than the last output watermark
		if (hasAlignedChannels &amp;&amp; newMinWatermark &gt; lastOutputWatermark)
{
			lastOutputWatermark = newMinWatermark;
			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
		}
	}



这段代码中好像并没有多个&nbsp; channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同
channelIndex 到来的watermark做一个取最小值的逻辑吗?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message