flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources
Date Fri, 10 Feb 2017 14:50:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master 02410324b -> 66305135b


http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
new file mode 100644
index 0000000..564901f
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
+ * and {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest} may also implicitly test {@link StatusWatermarkValve}
+ * and that valves are correctly used in the tasks' input processors, the unit tests here additionally makes sure that
+ * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a
+ * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases.
+ *
+ * <p>
+ * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call,
+ * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that
+ * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of
+ * input calls to the valve is trying to test is explained as inline comments within the tests.
+ */
+public class StatusWatermarkValveTest {
+
+	/**
+	 * Tests that all input channels of a valve start as ACTIVE stream status.
+	 */
+	@Test
+	public void testAllInputChannelsStartAsActive() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput);
+
+		// ------------------------------------------------------------------------
+		//  Ensure that the valve will output an IDLE stream status as soon as
+		//  all input channels become IDLE; this also implicitly ensures that
+		//  all input channels start as ACTIVE.
+		// ------------------------------------------------------------------------
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 3);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	/**
+	 * Tests that valves work as expected when they handle only 1 input channel.
+	 * Tested behaviours are explained as inline comments.
+	 */
+	@Test
+	public void testOneInputValve() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
+
+		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
+		// no state change is toggled, therefore no stream status should be emitted
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// input some monotonously increasing watermarks while ACTIVE;
+		// the exact same watermarks should be emitted right after the inputs
+		valve.inputWatermark(new Watermark(0), 0);
+		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// decreasing watermarks should not result in any output
+		valve.inputWatermark(new Watermark(18), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(42), 0);
+		assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// toggling ACTIVE to IDLE should result in an IDLE stream status output
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
+		valve.inputWatermark(new Watermark(52), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(60), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// no status change toggle while IDLE should result in stream status outputs
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
+		// decreasing watermarks should therefore still be ignored, even after a status toggle
+		valve.inputWatermark(new Watermark(40), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
+		valve.inputWatermark(new Watermark(68), 0);
+		assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(72), 0);
+		assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	/**
+	 * Tests that valves work as expected when they handle multiple input channels (tested with 3).
+	 * Tested behaviours are explained as inline comments.
+	 */
+	@Test
+	public void testMultipleInputValve() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		// ------------------------------------------------------------------------
+		//  Ensure that watermarks are output only when all
+		//  channels have been input some watermark.
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(0), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(0), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(0), 2);
+		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that watermarks are output as soon as the overall min
+		//  watermark across all channels have advanced.
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(12), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(8), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(10), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(15), 1);
+		// lowest watermark across all channels is now channel 2, with watermark @ 10
+		assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that decreasing watermarks are ignored
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(6), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that when some input channel becomes idle, that channel will
+		//  no longer be accounted for when advancing the watermark.
+		// ------------------------------------------------------------------------
+
+		// marking channel 2 as IDLE shouldn't result in overall status toggle for the valve,
+		// because there are still other active channels (0 and 1), so there should not be any
+		// stream status outputs;
+		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
+		// so the valve should output that
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
+		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
+		valve.inputWatermark(new Watermark(17), 0);
+		assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(25), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(20), 1);
+		assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that after some channel resumes to be ACTIVE, it needs to
+		//  catch up" with the current overall min watermark before it can be
+		//  accounted for again when finding the min watermark across channels.
+		//  Also tests that before the resumed channel catches up, the overall
+		//  min watermark can still advance with watermarks of other channels.
+		// ------------------------------------------------------------------------
+
+		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
+		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// although watermarks for channel 2 will now be accepted, it still
+		// hasn't caught up with the overall min watermark (20)
+		valve.inputWatermark(new Watermark(18), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
+		valve.inputWatermark(new Watermark(22), 1);
+		assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(28), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(33), 1);
+		assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// now, channel 2 has caught up with the overall min watermark
+		valve.inputWatermark(new Watermark(30), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(31), 0);
+		// this acknowledges that channel 2's watermark is being accounted for again
+		assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(34), 2);
+		assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that once all channels are IDLE, the valve should also
+		//  determine itself to be IDLE output a IDLE stream status
+		// ------------------------------------------------------------------------
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		// this is because once channel 0 becomes IDLE,
+		// the new min watermark will be 33 (channel 1)
+		assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that channels gradually become ACTIVE again, the above behaviours
+		//  still hold. Also ensure that as soon as one of the input channels
+		//  become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status.
+		// ------------------------------------------------------------------------
+
+		// let channel 0 resume to be ACTIVE
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// channel 0 is the only ACTIVE channel now, and is the only channel
+		// accounted for when advancing min watermark
+		valve.inputWatermark(new Watermark(36), 0);
+		assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// new also let channel 1 become ACTIVE
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// channel 1 is still behind overall min watermark
+		valve.inputWatermark(new Watermark(35), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// since channel 1 is still behind, channel 0 remains to be the only
+		// channel used to advance min watermark
+		valve.inputWatermark(new Watermark(37), 0);
+		assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// now, channel 1 has caught up with the overall min watermark
+		valve.inputWatermark(new Watermark(38), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(40), 0);
+		// this acknowledges that channel 1's watermark is being accounted for again
+		assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
+		private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>();
+		private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>();
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			outputWatermarks.add(watermark);
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			outputStreamStatuses.add(streamStatus);
+		}
+
+		public Watermark popLastOutputWatermark() {
+			return outputWatermarks.poll();
+		}
+
+		public StreamStatus popLastOutputStreamStatus() {
+			return outputStreamStatuses.poll();
+		}
+
+		public boolean hasNoOutputWatermarks() {
+			return outputWatermarks.size() == 0;
+		}
+
+		public boolean hasNoOutputStreamStatuses() {
+			return outputStreamStatuses.size() == 0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
new file mode 100644
index 0000000..247dc8b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class StreamStatusTest {
+
+	@Test (expected = IllegalArgumentException.class)
+	public void testIllegalCreationThrowsException() {
+		new StreamStatus(32);
+	}
+
+	@Test
+	public void testEquals() {
+		StreamStatus idleStatus = new StreamStatus(StreamStatus.IDLE_STATUS);
+		StreamStatus activeStatus = new StreamStatus(StreamStatus.ACTIVE_STATUS);
+
+		assertEquals(StreamStatus.IDLE, idleStatus);
+		assertTrue(idleStatus.isIdle());
+		assertFalse(idleStatus.isActive());
+
+		assertEquals(StreamStatus.ACTIVE, activeStatus);
+		assertTrue(activeStatus.isActive());
+		assertFalse(activeStatus.isIdle());
+	}
+
+	@Test
+	public void testTypeCasting() {
+		StreamStatus status = StreamStatus.ACTIVE;
+
+		assertTrue(status.isStreamStatus());
+		assertFalse(status.isRecord());
+		assertFalse(status.isWatermark());
+		assertFalse(status.isLatencyMarker());
+
+		try {
+			status.asWatermark();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+
+		try {
+			status.asRecord();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+
+		try {
+			status.asLatencyMarker();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index be93f6a..4b08c83 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -38,15 +39,18 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -59,14 +63,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -98,6 +95,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
@@ -126,15 +124,20 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether
 	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 * forwarded watermark must be the minimum of the watermarks of all active inputs.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
+	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		final OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<String, String>(
+				mapTask, 2, 2,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -180,8 +183,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		expectedOutput.add(new Watermark(initialTime + 2));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-
-		// advance watermark from one of the inputs, now we should get a now one since the
+		// advance watermark from one of the inputs, now we should get a new one since the
 		// minimum increases
 		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
 		testHarness.waitForInputProcessing();
@@ -196,6 +198,31 @@ public class OneInputStreamTaskTest extends TestLogger {
 		expectedOutput.add(new Watermark(initialTime + 4));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
+		// test whether idle input channels are acknowledged correctly when forwarding watermarks
+		testHarness.processElement(StreamStatus.IDLE, 0, 1);
+		testHarness.processElement(StreamStatus.IDLE, 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
+		testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
+		                                                     // watermark (initial + 6) should be forwarded
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 5));
+		expectedOutput.add(new Watermark(initialTime + 6));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make all input channels idle and check that the operator's idle status is forwarded
+		testHarness.processElement(StreamStatus.IDLE, 0, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.IDLE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make some input channels active again and check that the operator's active status is forwarded only once
+		testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+		testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.ACTIVE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();
@@ -205,12 +232,170 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	/**
+	 * This test verifies that watermarks are not forwarded when the task is idle.
+	 * It also verifies that when task is idle, watermarks generated in the middle of chains are also blocked and
+	 * never forwarded.
+	 *
+	 * The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator).
+	 * The operators will throw an exception and fail the test if either of them were forwarded watermarks when
+	 * the task is idle.
+	 */
+	@Test
+	public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {
+		final OneInputStreamTask<String, String> testTask = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<String, String>(
+				testTask, 1, 1,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		// ------------------ setup the chain ------------------
+
+		TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
+		StreamConfig headOperatorConfig = testHarness.getStreamConfig();
+
+		WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
+		StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration());
+
+		TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
+		StreamConfig tailOperatorConfig = new StreamConfig(new Configuration());
+
+		headOperatorConfig.setStreamOperator(headOperator);
+		headOperatorConfig.setChainStart();
+		headOperatorConfig.setChainIndex(0);
+		headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
+			new StreamNode(null, 0, null, null, null, null, null),
+			new StreamNode(null, 1, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			null
+		)));
+
+		watermarkOperatorConfig.setStreamOperator(watermarkOperator);
+		watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+		watermarkOperatorConfig.setChainIndex(1);
+		watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
+			new StreamNode(null, 1, null, null, null, null, null),
+			new StreamNode(null, 2, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			null
+		)));
+
+		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+		outEdgesInOrder.add(new StreamEdge(
+			new StreamNode(null, 2, null, null, null, null, null),
+			new StreamNode(null, 3, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			new BroadcastPartitioner<Object>()));
+
+		tailOperatorConfig.setStreamOperator(tailOperator);
+		tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+		tailOperatorConfig.setBufferTimeout(0);
+		tailOperatorConfig.setChainIndex(2);
+		tailOperatorConfig.setChainEnd();
+		tailOperatorConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
+		tailOperatorConfig.setNumberOfOutputs(1);
+		tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+		tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder);
+		tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE);
+
+		Map<Integer, StreamConfig> chainedConfigs = new HashMap<>(2);
+		chainedConfigs.put(1, watermarkOperatorConfig);
+		chainedConfigs.put(2, tailOperatorConfig);
+		headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+		headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+
+		// -----------------------------------------------------
+
+		// --------------------- begin test ---------------------
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		// the task starts as active, so all generated watermarks should be forwarded
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+		testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+
+		// this watermark will be forwarded since the task is currently active,
+		// but should not be in the final output because it should be blocked by the watermark generator in the chain
+		testHarness.processElement(new Watermark(15));
+
+		testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+		testHarness.waitForInputProcessing();
+
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("10"));
+		expectedOutput.add(new Watermark(10));
+		expectedOutput.add(new StreamRecord<>("20"));
+		expectedOutput.add(new Watermark(20));
+		expectedOutput.add(new StreamRecord<>("30"));
+		expectedOutput.add(new Watermark(30));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// now, toggle the task to be idle, and let the watermark generator produce some watermarks
+		testHarness.processElement(StreamStatus.IDLE);
+
+		// after this, the operators will throw an exception if they are forwarded watermarks anywhere in the chain
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+
+		// NOTE: normally, tasks will not have records to process while idle;
+		// we're doing this here only to mimic watermark generating in operators
+		testHarness.processElement(new StreamRecord<>("40"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("50"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("60"), 0, 0);
+		testHarness.processElement(new Watermark(65)); // the test will fail if any of the operators were forwarded this
+		testHarness.waitForInputProcessing();
+
+		// the 40 - 60 watermarks should not be forwarded, only the stream status toggle element and records
+		expectedOutput.add(StreamStatus.IDLE);
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("40"));
+		expectedOutput.add(new StreamRecord<>("50"));
+		expectedOutput.add(new StreamRecord<>("60"));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// re-toggle the task to be active and see if new watermarks are correctly forwarded again
+		testHarness.processElement(StreamStatus.ACTIVE);
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+		testHarness.processElement(new StreamRecord<>("70"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("80"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("90"), 0, 0);
+		testHarness.waitForInputProcessing();
+
+		expectedOutput.add(StreamStatus.ACTIVE);
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("70"));
+		expectedOutput.add(new Watermark(70));
+		expectedOutput.add(new StreamRecord<>("80"));
+		expectedOutput.add(new Watermark(80));
+		expectedOutput.add(new StreamRecord<>("90"));
+		expectedOutput.add(new Watermark(90));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		assertEquals(12, resultElements.size());
+	}
+
+	/**
 	 * This test verifies that checkpoint barriers are correctly forwarded.
 	 */
 	@Test
 	public void testCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -269,6 +454,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -339,6 +525,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 		final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 		final OneInputStreamTask<String, String> streamTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(streamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
+
 		IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
 		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -658,5 +846,80 @@ public class OneInputStreamTaskTest extends TestLogger {
 			return value;
 		}
 	}
+
+	/**
+	 * A {@link TriggerableFailOnWatermarkTestOperator} that generates watermarks.
+	 */
+	private static class WatermarkGeneratingTestOperator extends TriggerableFailOnWatermarkTestOperator {
+
+		private static final long serialVersionUID = -5064871833244157221L;
+
+		private long lastWatermark;
+
+		@Override
+		protected void handleElement(StreamRecord<String> element) {
+			long timestamp = Long.valueOf(element.getValue());
+			if (timestamp > lastWatermark) {
+				output.emitWatermark(new Watermark(timestamp));
+				lastWatermark = timestamp;
+			}
+		}
+
+		@Override
+		protected void handleWatermark(Watermark mark) {
+			if (mark.equals(Watermark.MAX_WATERMARK)) {
+				output.emitWatermark(mark);
+				lastWatermark = Long.MAX_VALUE;
+			}
+		}
+	}
+
+	/**
+	 * An operator that can be triggered whether or not to expect watermarks forwarded to it, toggled
+	 * by letting it process special trigger marker records.
+	 *
+	 * If it receives a watermark when it's not expecting one, it'll throw an exception and fail.
+	 */
+	private static class TriggerableFailOnWatermarkTestOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<String, String> {
+
+		private static final long serialVersionUID = 2048954179291813243L;
+
+		public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS";
+		public final static String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS";
+
+		protected boolean expectForwardedWatermarks;
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			output.collect(element);
+
+			if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
+				this.expectForwardedWatermarks = true;
+			} else if (element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
+				this.expectForwardedWatermarks = false;
+			} else {
+				handleElement(element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			if (!expectForwardedWatermarks) {
+				throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks.");
+			} else {
+				handleWatermark(mark);
+			}
+		}
+
+		protected void handleElement(StreamRecord<String> element) {
+			// do nothing
+		}
+
+		protected void handleWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index dd1fe58..0773699 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -57,6 +57,7 @@ public class SourceStreamTaskTest {
 	public void testOpenClose() throws Exception {
 		final SourceStreamTask<String, SourceFunction<String>, StreamSource<String, SourceFunction<String>>> sourceTask = new SourceStreamTask<>();
 		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamSource<String, ?> sourceOperator = new StreamSource<>(new OpenCloseTestSource());
@@ -99,6 +100,7 @@ public class SourceStreamTaskTest {
 			final SourceStreamTask<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>,
 				StreamSource<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>();
 			final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
+			testHarness.setupOutputForSingletonOperatorChain();
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
 			StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index ebe5285..c2d4aaa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -50,6 +50,7 @@ public class StreamTaskCancellationBarrierTest {
 	public void testEmitCancellationBarrierWhenNotReady() throws Exception {
 		StreamTask<String, ?> task = new InitBlockingTask();
 		StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		// start the test - this cannot succeed across the 'init()' method
 		testHarness.invoke();
@@ -80,6 +81,7 @@ public class StreamTaskCancellationBarrierTest {
 				task,
 				1, 2,
 				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
@@ -124,6 +126,7 @@ public class StreamTaskCancellationBarrierTest {
 		TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
 				task,
 				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 8dc6afa..e58bc5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -106,9 +106,6 @@ public class StreamTaskTestHarness<OUT> {
 		this.executionConfig = new ExecutionConfig();
 
 		streamConfig = new StreamConfig(taskConfig);
-		streamConfig.setChainStart();
-		streamConfig.setBufferTimeout(0);
-		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
@@ -129,11 +126,25 @@ public class StreamTaskTestHarness<OUT> {
 	@SuppressWarnings("unchecked")
 	private void initializeOutput() {
 		outputList = new ConcurrentLinkedQueue<Object>();
-
 		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
+	}
 
+	/**
+	 * Users of the test harness can call this utility method to setup the stream config
+	 * if there will only be a single operator to be tested. The method will setup the
+	 * outgoing network connection for the operator.
+	 *
+	 * For more advanced test cases such as testing chains of multiple operators with the harness,
+	 * please manually configure the stream config.
+	 */
+	public void setupOutputForSingletonOperatorChain() {
+		streamConfig.setChainStart();
+		streamConfig.setBufferTimeout(0);
+		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
 		streamConfig.setNumberOfOutputs(1);
+		streamConfig.setTypeSerializerOut(outputSerializer);
+		streamConfig.setVertexID(0);
 
 		StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
 			private static final long serialVersionUID = 1L;
@@ -142,13 +153,10 @@ public class StreamTaskTestHarness<OUT> {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
 		StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 		StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
 		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+
 		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
 		streamConfig.setNonChainedOutputs(outEdgesInOrder);
-		streamConfig.setTypeSerializerOut(outputSerializer);
-		streamConfig.setVertexID(0);
-
 	}
 
 	public StreamMockEnvironment createEnvironment() {
@@ -330,9 +338,6 @@ public class StreamTaskTestHarness<OUT> {
 					allEmpty = false;
 				}
 			}
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException ignored) {}
 
 			if (allEmpty) {
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3cd9c9a..c0a1638 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.junit.Assert;
@@ -58,6 +59,7 @@ public class TwoInputStreamTaskTest {
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
@@ -89,15 +91,21 @@ public class TwoInputStreamTaskTest {
 	}
 
 	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether
 	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 * forwarded watermark must be the minimum of the watermarks of all active inputs.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
+	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
+			new TwoInputStreamTaskTestHarness<String, Integer, String>(
+				coMapTask, 2, 2, new int[] {1, 2},
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -147,7 +155,7 @@ public class TwoInputStreamTaskTest {
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
 
-		// advance watermark from one of the inputs, now we should get a now one since the
+		// advance watermark from one of the inputs, now we should get a new one since the
 		// minimum increases
 		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
 		testHarness.waitForInputProcessing();
@@ -162,6 +170,33 @@ public class TwoInputStreamTaskTest {
 		expectedOutput.add(new Watermark(initialTime + 4));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
+		// test whether idle input channels are acknowledged correctly when forwarding watermarks
+		testHarness.processElement(StreamStatus.IDLE, 0, 1);
+		testHarness.processElement(StreamStatus.IDLE, 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
+		testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
+		                                                     // watermark (initial + 6) should be forwarded
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 5));
+		// We don't expect to see Watermark(6) here because the idle status of one
+		// input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input
+		// two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6.
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make all input channels idle and check that the operator's idle status is forwarded
+		testHarness.processElement(StreamStatus.IDLE, 0, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.IDLE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make some input channels active again and check that the operator's active status is forwarded only once
+		testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+		testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.ACTIVE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();
@@ -178,6 +213,7 @@ public class TwoInputStreamTaskTest {
 	public void testCheckpointBarriers() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -258,6 +294,7 @@ public class TwoInputStreamTaskTest {
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());


Mime
View raw message