flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "1193216154" <1193216...@qq.com>
Subject 对flink源码中watermark对齐逻辑的疑惑
Date Mon, 11 May 2020 07:16:45 GMT

public void inputWatermark(Watermark watermark, int channelIndex) {
		// ignore the input watermark if its input channel, or all input channels are idle (i.e.
overall the valve is idle).
		if (lastOutputStreamStatus.isActive() &amp;&amp; channelStatuses[channelIndex].streamStatus.isActive())
			long watermarkMillis = watermark.getTimestamp();

			// if the input watermark's value is less than the last received watermark for its input
channel, ignore it also.
			if (watermarkMillis &gt; channelStatuses[channelIndex].watermark) {
				channelStatuses[channelIndex].watermark = watermarkMillis;

				// previously unaligned input channels are now aligned if its watermark has caught up
				if (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp; watermarkMillis
&gt;= lastOutputWatermark) {
					channelStatuses[channelIndex].isWatermarkAligned = true;

				// now, attempt to find a new min watermark across all aligned channels

private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
		long newMinWatermark = Long.MAX_VALUE;
		boolean hasAlignedChannels = false;

		// determine new overall watermark by considering only watermark-aligned channels across
all channels
		for (InputChannelStatus channelStatus : channelStatuses) {
			if (channelStatus.isWatermarkAligned) {
				hasAlignedChannels = true;
				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);

		// we acknowledge and output the new overall watermark if it really is aggregated
		// from some remaining aligned channel, and is also larger than the last output watermark
		if (hasAlignedChannels &amp;&amp; newMinWatermark &gt; lastOutputWatermark)
			lastOutputWatermark = newMinWatermark;
			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));

这段代码中好像并没有多个&nbsp; channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同
channelIndex 到来的watermark做一个取最小值的逻辑吗?
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message