flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/4] flink git commit: [FLINK-7728] [DataStream] Make StatusWatermarkValve unit tests more fine-grained
Date Mon, 02 Oct 2017 14:54:55 GMT
[FLINK-7728] [DataStream] Make StatusWatermarkValve unit tests more fine-grained

Previously, the unit tests in StatusWatermarkValveTest were too
cluttered and testing too many behaviours in a single test. This makes
it hard to have a good overview of what test cases are covered.

This commit is a rework of the previous tests, making them more
fine-grained so that the scope of each test is small enough. All
previously tested behaviours are still covered.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c528139c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c528139c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c528139c

Branch: refs/heads/release-1.3
Commit: c528139c3c2b33ba7bd11df154ba204408713b02
Parents: 2875260
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Fri Sep 29 11:16:22 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 2 16:52:22 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValve.java      |  26 +-
 .../streamstatus/StatusWatermarkValveTest.java  | 366 ++++++++++---------
 2 files changed, 214 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c528139c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index 3dceb0a..5f1828f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.streaming.runtime.streamstatus;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link
StreamStatus} are propagated to
@@ -40,6 +42,7 @@ public class StatusWatermarkValve {
 	 */
 	public interface ValveOutputHandler {
 		void handleWatermark(Watermark watermark);
+
 		void handleStreamStatus(StreamStatus streamStatus);
 	}
 
@@ -212,10 +215,11 @@ public class StatusWatermarkValve {
 	 *   caught up to the last output watermark from the valve yet.
 	 * </ul>
 	 */
-	private static class InputChannelStatus {
-		private long watermark;
-		private StreamStatus streamStatus;
-		private boolean isWatermarkAligned;
+	@VisibleForTesting
+	protected static class InputChannelStatus {
+		protected long watermark;
+		protected StreamStatus streamStatus;
+		protected boolean isWatermarkAligned;
 
 		/**
 		 * Utility to check if at least one channel in a given array of input channels is active.
@@ -230,4 +234,12 @@ public class StatusWatermarkValve {
 		}
 	}
 
+	@VisibleForTesting
+	protected InputChannelStatus getInputChannelStatus(int channelIndex) {
+		Preconditions.checkArgument(
+			channelIndex >= 0 && channelIndex < channelStatuses.length,
+			"Invalid channel index. Number of input channels: " + channelStatuses.length);
+
+		return channelStatuses[channelIndex];
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c528139c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 70dc565..fbf0622 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -19,14 +19,16 @@
 package org.apache.flink.streaming.runtime.streamstatus;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
-
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
 import org.junit.Test;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
@@ -43,276 +45,262 @@ import static org.junit.Assert.assertEquals;
 public class StatusWatermarkValveTest {
 
 	/**
-	 * Tests that all input channels of a valve start as ACTIVE stream status.
+	 * Tests that watermarks correctly advance with increasing watermarks for a single input
valve.
 	 */
 	@Test
-	public void testAllInputChannelsStartAsActive() {
+	public void testSingleInputIncreasingWatermarks() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
-		StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput);
-
-		// ------------------------------------------------------------------------
-		//  Ensure that the valve will output an IDLE stream status as soon as
-		//  all input channels become IDLE; this also implicitly ensures that
-		//  all input channels start as ACTIVE.
-		// ------------------------------------------------------------------------
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		valve.inputStreamStatus(StreamStatus.IDLE, 3);
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		valve.inputWatermark(new Watermark(0), 0);
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
-	 * Tests that valves work as expected when they handle only 1 input channel.
-	 * Tested behaviours are explained as inline comments.
+	 * Tests that watermarks do not advance with decreasing watermark inputs for a single input
valve.
 	 */
 	@Test
-	public void testOneInputValve() {
+	public void testSingleInputDecreasingWatermarksYieldsNoOutput() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
-		// no state change is toggled, therefore no stream status should be emitted
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
-		// input some monotonously increasing watermarks while ACTIVE;
-		// the exact same watermarks should be emitted right after the inputs
-		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(25), 0);
 		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 
-		// decreasing watermarks should not result in any output
 		valve.inputWatermark(new Watermark(18), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(42), 0);
 		assertEquals(new Watermark(42), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
+	/**
+	 * Tests that stream status toggling works correctly, as well as that non-toggling status
+	 * inputs do not yield output for a single input valve.
+	 */
+	@Test
+	public void testSingleInputStreamStatusToggling() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
+
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		// this also implicitly verifies that input channels start as ACTIVE
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// toggling ACTIVE to IDLE should result in an IDLE stream status output
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
-		// watermark inputs should be ignored while all input channels (only 1 in this case) are
IDLE
-		valve.inputWatermark(new Watermark(52), 0);
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(60), 0);
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
+	/**
+	 * Tests that the watermark of an input channel remains intact while in the IDLE status.
+	 */
+	@Test
+	public void testSingleInputWatermarksIntactDuringIdleness() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		// no status change toggle while IDLE should result in stream status outputs
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+
+		valve.inputWatermark(new Watermark(50), 0);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+		assertEquals(25, valve.getInputChannelStatus(0).watermark);
 
-		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
 		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
-
-
-		// the valve should remember the last watermark input channels received while they were
ACTIVE (which was 42);
-		// decreasing watermarks should therefore still be ignored, even after a status toggle
-		valve.inputWatermark(new Watermark(40), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
-		valve.inputWatermark(new Watermark(68), 0);
-		assertEquals(new Watermark(68), valveOutput.popLastSeenOutput());
-
-		valve.inputWatermark(new Watermark(72), 0);
-		assertEquals(new Watermark(72), valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(50), 0);
+		assertEquals(new Watermark(50), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
-	 * Tests that valves work as expected when they handle multiple input channels (tested with
3).
-	 * Tested behaviours are explained as inline comments.
+	 * Tests that the valve yields a watermark only when all inputs have received a watermark.
 	 */
 	@Test
-	public void testMultipleInputValve() {
+	public void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// ------------------------------------------------------------------------
-		//  Ensure that watermarks are output only when all
-		//  channels have been input some watermark.
-		// ------------------------------------------------------------------------
-
 		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(0), 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
+		// now, all channels have watermarks
 		valve.inputWatermark(new Watermark(0), 2);
 		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// ------------------------------------------------------------------------
-		//  Ensure that watermarks are output as soon as the overall min
-		//  watermark across all channels have advanced.
-		// ------------------------------------------------------------------------
+	/**
+	 * Tests that new min watermark is emitted from the valve as soon as the overall
+	 * new min watermark across inputs advances.
+	 */
+	@Test
+	public void testMultipleInputIncreasingWatermarks() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		valve.inputWatermark(new Watermark(12), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(0), 0);
+		valve.inputWatermark(new Watermark(0), 1);
+		valve.inputWatermark(new Watermark(0), 2);
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
+		valve.inputWatermark(new Watermark(12), 0);
 		valve.inputWatermark(new Watermark(8), 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(10), 2);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(15), 1);
 		// lowest watermark across all channels is now channel 2, with watermark @ 10
 		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that decreasing watermarks are ignored
-		// ------------------------------------------------------------------------
-
-		valve.inputWatermark(new Watermark(6), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// ------------------------------------------------------------------------
-		//  Ensure that when some input channel becomes idle, that channel will
-		//  no longer be accounted for when advancing the watermark.
-		// ------------------------------------------------------------------------
-
-		// marking channel 2 as IDLE shouldn't result in overall status toggle for the valve,
-		// because there are still other active channels (0 and 1), so there should not be any
-		// stream status outputs;
-		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
-		// so the valve should output that
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		valve.inputWatermark(new Watermark(17), 2);
+		// lowest watermark across all channels is now channel 0, with watermark @ 12
 		assertEquals(new Watermark(12), valveOutput.popLastSeenOutput());
-
-		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
-		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark
(10)
-		valve.inputWatermark(new Watermark(17), 0);
-		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
-
-		valve.inputWatermark(new Watermark(25), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(20), 1);
-		assertEquals(new Watermark(20), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that after some channel resumes to be ACTIVE, it needs to
-		//  "catch up" with the current overall min watermark before it can be
-		//  accounted for again when finding the min watermark across channels.
-		//  Also tests that before the resumed channel catches up, the overall
-		//  min watermark can still advance with watermarks of other channels.
-		// ------------------------------------------------------------------------
-
-		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
-		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		valve.inputWatermark(new Watermark(20), 0);
+		// lowest watermark across all channels is now channel 1, with watermark @ 15
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// although watermarks for channel 2 will now be accepted, it still
-		// hasn't caught up with the overall min watermark (20)
-		valve.inputWatermark(new Watermark(18), 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that for a multiple input valve, decreasing watermarks will yield no output.
+	 */
+	@Test
+	public void testMultipleInputDecreasingWatermarksYieldsNoOutput() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
-		valve.inputWatermark(new Watermark(22), 1);
-		assertEquals(new Watermark(22), valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(25), 0);
+		valve.inputWatermark(new Watermark(10), 1);
+		valve.inputWatermark(new Watermark(17), 2);
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(28), 0);
+		valve.inputWatermark(new Watermark(12), 0);
+		valve.inputWatermark(new Watermark(8), 1);
+		valve.inputWatermark(new Watermark(15), 2);
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		valve.inputWatermark(new Watermark(33), 1);
-		assertEquals(new Watermark(28), valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that stream status toggling works correctly, as well as that non-toggling status
+	 * inputs do not yield output for a multiple input valve.
+	 */
+	@Test
+	public void testMultipleInputStreamStatusToggling() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(2, valveOutput);
 
-		// now, channel 2 has caught up with the overall min watermark
-		valve.inputWatermark(new Watermark(30), 2);
+		// this also implicitly verifies that all input channels start as active
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(31), 0);
-		// this acknowledges that channel 2's watermark is being accounted for again
-		assertEquals(new Watermark(30), valveOutput.popLastSeenOutput());
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(34), 2);
-		assertEquals(new Watermark(31), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that once all channels are IDLE, the valve should also
-		//  determine itself to be IDLE and output a IDLE stream status
-		// ------------------------------------------------------------------------
-
+		// now, all channels are IDLE
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		// this is because once channel 0 becomes IDLE,
-		// the new min watermark will be 33 (channel 1)
-		assertEquals(new Watermark(33), valveOutput.popLastSeenOutput());
-
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
-		// now let all channels become idle; we should only see the idle marker be emitted, and
nothing else
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// ------------------------------------------------------------------------
-		//  Ensure that as channels gradually become ACTIVE again, the above behaviours
-		//  still hold. Also ensure that as soon as one of the input channels
-		//  become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status.
-		// ------------------------------------------------------------------------
+		// as soon as at least one input becomes active again, the ACTIVE marker should be forwarded
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 
-		// let channel 0 resume to be ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
+		// already back to ACTIVE, should yield no output
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// channel 0 is the only ACTIVE channel now, and is the only channel
-		// accounted for when advancing min watermark
-		valve.inputWatermark(new Watermark(36), 0);
-		assertEquals(new Watermark(36), valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that for multiple inputs, when some inputs are idle, the min watermark
+	 * is correctly computed and advanced from the remaining active inputs.
+	 */
+	@Test
+	public void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// new also let channel 1 become ACTIVE
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		valve.inputWatermark(new Watermark(15), 0);
+		valve.inputWatermark(new Watermark(10), 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// channel 1 is still behind overall min watermark
-		valve.inputWatermark(new Watermark(35), 1);
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		// min watermark should be computed from remaining ACTIVE channels
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// since channel 1 is still behind, channel 0 remains to be the only
-		// channel used to advance min watermark
-		valve.inputWatermark(new Watermark(37), 0);
-		assertEquals(new Watermark(37), valveOutput.popLastSeenOutput());
-
-		// temporarily let channel 0 (the only active and aligned input) become idle;
-		// this should not result in any watermark or stream status output,
-		// because channel 1 is still active (therefore no stream status toggle) and
-		// at the same time not aligned (therefore should not produce any new min watermarks)
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputWatermark(new Watermark(18), 1);
+		// now, min watermark should be 15 from channel #0
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		valve.inputWatermark(new Watermark(20), 0);
+		// now, min watermark should be 18 from channel #1
+		assertEquals(new Watermark(18), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// now, let channel 1 catch up with the overall min watermark
-		valve.inputWatermark(new Watermark(38), 1);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that as input channels individually and gradually become idle, watermarks
+	 * are output as soon remaining active channels can yield a new min watermark.
+	 */
+	@Test
+	public void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		valve.inputWatermark(new Watermark(25), 0);
+		valve.inputWatermark(new Watermark(10), 1);
+		valve.inputWatermark(new Watermark(17), 2);
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		// only channel 0 & 2 is ACTIVE; 17 is the overall min watermark now
+		assertEquals(new Watermark(17), valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(40), 0);
-		// this acknowledges that channel 1's watermark is being accounted for again
-		assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		// only channel 0 is ACTIVE; 25 is the overall min watermark now
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
 	 * Tests that when all inputs become idle, the max watermark across all channels
 	 * is correctly "flushed" from the valve, as well as the stream status IDLE marker.
+	 *
+	 * <p>This test along with {@link #testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle}
+	 * should completely verify that the eventual watermark advancement result when all inputs
become idle
+	 * is independent of the order that the inputs become idle.
 	 */
 	@Test
-	public void testAllInputsBecomeIdleFlushMaxWatermarkAndStreamStatus() {
+	public void testMultipleInputFlushMaxWatermarkAndStreamStatusOnceAllInputsBecomeIdle() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
@@ -325,11 +313,7 @@ public class StatusWatermarkValveTest {
 		// -------------------------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(10), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(5), 1);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(3), 2);
 		assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
 
@@ -349,6 +333,46 @@ public class StatusWatermarkValveTest {
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
+	/**
+	 * Tests that when idle channels become active again, they need to "catch up" with
+	 * the latest watermark before they are considered for min watermark computation again.
+	 */
+	@Test
+	public void testMultipleInputWatermarkRealignmentAfterResumeActive() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		valve.inputWatermark(new Watermark(10), 0);
+		valve.inputWatermark(new Watermark(7), 1);
+		valve.inputWatermark(new Watermark(3), 2);
+		assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(7), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// let channel 2 become active again; since the min watermark has now advanced to 7,
+		// channel 2 should have been marked as non-aligned.
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		assertFalse(valve.getInputChannelStatus(2).isWatermarkAligned);
+
+		// during the realignment process, watermarks should still be accepted by channel 2 (but
shouldn't yield new watermarks)
+		valve.inputWatermark(new Watermark(5), 2);
+		assertEquals(5, valve.getInputChannelStatus(2).watermark);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// let channel 2 catch up with the min watermark; now should be realigned
+		valve.inputWatermark(new Watermark(9), 2);
+		assertTrue(valve.getInputChannelStatus(2).isWatermarkAligned);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// check that realigned inputs is now taken into account for watermark advancement
+		valve.inputWatermark(new Watermark(12), 1);
+		assertEquals(new Watermark(9), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler
{
 		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();
 


Mime
View raw message