flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [03/11] flink git commit: [FLINK-4877] Refactor OperatorTestHarness to always use TestProcessingTimeService
Date Fri, 21 Oct 2016 17:14:18 GMT
[FLINK-4877] Refactor OperatorTestHarness to always use TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30554758
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30554758
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30554758

Branch: refs/heads/master
Commit: 30554758897842ad851dc9b6e1758d452f7d702f
Parents: e112a63
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Sep 28 16:43:40 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |  18 +-
 .../fs/bucketing/BucketingSinkTest.java         |  69 ++-
 ...stampsAndPeriodicWatermarksOperatorTest.java |   8 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 355 ++++++--------
 ...AlignedProcessingTimeWindowOperatorTest.java | 475 ++++++-------------
 .../operators/windowing/CollectingOutput.java   |  86 ----
 .../operators/windowing/NoOpTimerService.java   |  52 --
 .../operators/windowing/WindowOperatorTest.java | 106 +----
 .../KeyedOneInputStreamOperatorTestHarness.java |  20 +-
 .../util/OneInputStreamOperatorTestHarness.java |  50 +-
 .../streaming/util/WindowingTestHarness.java    |  10 +-
 11 files changed, 384 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 971d5f8..56d8efc 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -127,20 +126,21 @@ public class ContinuousFileMonitoringTest {
 		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
 		reader.setOutputType(typeInfo, executionConfig);
 
-		final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService();
 		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
-			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+			new OneInputStreamOperatorTestHarness<>(reader, executionConfig);
+
 		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
 		tester.open();
 
 		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
 
 		// test that watermarks are correctly emitted
 
-		timeServiceProvider.setCurrentTime(201);
-		timeServiceProvider.setCurrentTime(301);
-		timeServiceProvider.setCurrentTime(401);
-		timeServiceProvider.setCurrentTime(501);
+		tester.setProcessingTime(201);
+		tester.setProcessingTime(301);
+		tester.setProcessingTime(401);
+		tester.setProcessingTime(501);
 
 		int i = 0;
 		for(Object line: tester.getOutput()) {
@@ -170,8 +170,8 @@ public class ContinuousFileMonitoringTest {
 		for(FileInputSplit split: splits) {
 
 			// set the next "current processing time".
-			long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
-			timeServiceProvider.setCurrentTime(nextTimestamp);
+			long nextTimestamp = tester.getProcessingTime() + watermarkInterval;
+			tester.setProcessingTime(nextTimestamp);
 
 			// send the next split to be read and wait until it is fully read.
 			tester.processElement(new StreamRecord<>(split));

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 0c0111c..f4b3cd7 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -70,7 +68,7 @@ public class BucketingSinkTest {
 	private static org.apache.hadoop.fs.FileSystem dfs;
 	private static String hdfsURI;
 
-	private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) {
+	private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir) throws Exception {
 		BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath())
 			.setBucketer(new Bucketer<String>() {
 				private static final long serialVersionUID = 1L;
@@ -87,12 +85,12 @@ public class BucketingSinkTest {
 			.setInactiveBucketThreshold(5*60*1000L)
 			.setPendingSuffix(".pending");
 
-		return createTestSink(sink, clock);
+		return createTestSink(sink);
 	}
 
-	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink,
-																			TestTimeServiceProvider clock) {
-		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock);
+	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+			BucketingSink<T> sink) throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig());
 	}
 
 	@BeforeClass
@@ -121,10 +119,7 @@ public class BucketingSinkTest {
 	public void testCheckpointWithoutNotify() throws Exception {
 		File dataDir = tempFolder.newFolder();
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
 
 		testHarness.setup();
 		testHarness.open();
@@ -133,13 +128,13 @@ public class BucketingSinkTest {
 		testHarness.processElement(new StreamRecord<>("Hello"));
 		testHarness.processElement(new StreamRecord<>("Hello"));
 
-		clock.setCurrentTime(10000L);
+		testHarness.setProcessingTime(10000L);
 
 		// snapshot but don't call notify to simulate a notify that never
 		// arrives, the sink should move pending files in restore() in that case
 		StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
 
-		testHarness = createTestSink(dataDir, clock);
+		testHarness = createTestSink(dataDir);
 		testHarness.setup();
 		testHarness.restore(snapshot1);
 		testHarness.open();
@@ -175,16 +170,15 @@ public class BucketingSinkTest {
 
 		final int numElements = 20;
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
-
 		BucketingSink<String> sink = new BucketingSink<String>(outPath)
 			.setBucketer(new BasePathBucketer<String>())
 			.setPartPrefix("part")
 			.setPendingPrefix("")
 			.setPendingSuffix("");
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock);
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink);
+
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -217,9 +211,6 @@ public class BucketingSinkTest {
 
 		final int numElements = 20;
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
-
 		BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
 			.setWriter(new SequenceFileWriter<IntWritable, Text>())
 			.setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>())
@@ -230,7 +221,9 @@ public class BucketingSinkTest {
 		sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness =
-			createTestSink(sink, clock);
+			createTestSink(sink);
+
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -271,9 +264,6 @@ public class BucketingSinkTest {
 
 		final int numElements = 20;
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
-
 		Map<String, String> properties = new HashMap<>();
 		Schema keySchema = Schema.create(Schema.Type.INT);
 		Schema valueSchema = Schema.create(Schema.Type.STRING);
@@ -290,7 +280,9 @@ public class BucketingSinkTest {
 			.setPendingSuffix("");
 
 		OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
-			createTestSink(sink, clock);
+			createTestSink(sink);
+
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -325,8 +317,8 @@ public class BucketingSinkTest {
 
 	/**
 	 * This uses {@link DateTimeBucketer} to
-	 * produce rolling files. A custom {@link TimeServiceProvider} is set
-	 * to simulate the advancing of time alongside the processing of elements.
+	 * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually
+	 * advance processing time.
 	 */
 	@Test
 	public void testDateTimeRollingStringWriter() throws Exception {
@@ -334,16 +326,15 @@ public class BucketingSinkTest {
 
 		final String outPath = hdfsURI + "/rolling-out";
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
-
 		BucketingSink<String> sink = new BucketingSink<String>(outPath)
 			.setBucketer(new DateTimeBucketer<String>("ss"))
 			.setPartPrefix("part")
 			.setPendingPrefix("")
 			.setPendingSuffix("");
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock);
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink);
+
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -351,7 +342,7 @@ public class BucketingSinkTest {
 		for (int i = 0; i < numElements; i++) {
 			// Every 5 elements, increase the clock time. We should end up with 5 elements per bucket.
 			if (i % 5 == 0) {
-				clock.setCurrentTime(i * 1000L);
+				testHarness.setProcessingTime(i * 1000L);
 			}
 			testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i)));
 		}
@@ -427,10 +418,9 @@ public class BucketingSinkTest {
 		final int numIds = 4;
 		final int numElements = 20;
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -465,10 +455,9 @@ public class BucketingSinkTest {
 		final int step2NumIds = 2;
 		final int numElementsPerStep = 20;
 
-		TestTimeServiceProvider clock = new TestTimeServiceProvider();
-		clock.setCurrentTime(0L);
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+		testHarness.setProcessingTime(0L);
 
 		testHarness.setup();
 		testHarness.open();
@@ -477,13 +466,13 @@ public class BucketingSinkTest {
 			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds)));
 		}
 
-		clock.setCurrentTime(2*60*1000L);
+		testHarness.setProcessingTime(2*60*1000L);
 
 		for (int i = 0; i < numElementsPerStep; i++) {
 			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
 		}
 
-		clock.setCurrentTime(6*60*1000L);
+		testHarness.setProcessingTime(6*60*1000L);
 
 		for (int i = 0; i < numElementsPerStep; i++) {
 			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index af99d0d..febfcde 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -43,10 +43,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		final ExecutionConfig config = new ExecutionConfig();
 		config.setAutoWatermarkInterval(50);
 
-		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		
 		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService);
+				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
 
 		long currentTime = 0;
 
@@ -77,7 +75,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
 					currentTime = currentTime + 10;
-					processingTimeService.setCurrentTime(currentTime);
+					testHarness.setProcessingTime(currentTime);
 				}
 			}
 			
@@ -109,7 +107,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
 					currentTime = currentTime + 10;
-					processingTimeService.setCurrentTime(currentTime);
+					testHarness.setProcessingTime(currentTime);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 128c88b..720258e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,15 +36,10 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -59,7 +54,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -68,7 +62,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings({"serial"})
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
@@ -183,45 +177,57 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
+
 		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final ProcessingTimeService timerService = new NoOpTimerService();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			AccumulatingProcessingTimeWindowOperator<String, String, String> op =
+					new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+							StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
 
-			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
+			KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+			testHarness.open();
 
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+			testHarness.open();
+
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
+
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+			testHarness.open();
+
 			assertTrue(op.getNextSlideTime() % 500 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 100 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+			testHarness.open();
+
+			assertEquals(0, op.getNextSlideTime() % 100);
+			assertEquals(0, op.getNextEvaluationTime() % 1100);
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -231,16 +237,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testTumblingWindow() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
-
 		try {
 			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -249,31 +247,23 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numElements = 1000;
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
+			testHarness.open();
 
-			// get and verify the result
-			out.waitForNElements(numElements, 60_000);
+			final int numElements = 1000;
 
-			timerService.quiesceAndAwaitPending();
+			long currentTime = 0;
 
-			synchronized (lock) {
-				op.close();
+			for (int i = 0; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+				currentTime = currentTime + 10;
+				testHarness.setProcessingTime(currentTime);
 			}
 
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
 
-			List<Integer> result = out.getElements();
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(numElements, result.size());
 
 			Collections.sort(result);
@@ -281,102 +271,70 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				assertEquals(i, result.get(i).intValue());
 			}
 
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testSlidingWindow() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
+		// tumbling window that triggers every 20 milliseconds
+		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+						validatingIdentityFunction, identitySelector,
+						IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
 
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-			
-			// tumbling window that triggers every 20 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
+		testHarness.open();
 
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
+		final int numElements = 1000;
 
-			timerService.quiesceAndAwaitPending();
+		long currentTime = 0;
 
-			synchronized (lock) {
-				op.close();
-			}
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(i));
+			currentTime = currentTime + 10;
+			testHarness.setProcessingTime(currentTime);
+		}
 
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
+		// get and verify the result
+		List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-			// get and verify the result
-			List<Integer> result = out.getElements();
+		// if we kept this running, each element would be in the result three times (for each slide).
+		// we are closing the window before the final panes are through three times, so we may have less
+		// elements.
+		if (result.size() < numElements || result.size() > 3 * numElements) {
+			fail("Wrong number of results: " + result.size());
+		}
 
-			// if we kept this running, each element would be in the result three times (for each slide).
-			// we are closing the window before the final panes are through three times, so we may have less
-			// elements.
-			if (result.size() < numElements || result.size() > 3 * numElements) {
-				fail("Wrong number of results: " + result.size());
-			}
+		Collections.sort(result);
+		int lastNum = -1;
+		int lastCount = -1;
 
-			Collections.sort(result);
-			int lastNum = -1;
-			int lastCount = -1;
-			
-			for (int num : result) {
-				if (num == lastNum) {
-					lastCount++;
-					assertTrue(lastCount <= 3);
-				}
-				else {
-					lastNum = num;
-					lastCount = 1;
-				}
+		for (int num : result) {
+			if (num == lastNum) {
+				lastCount++;
+				assertTrue(lastCount <= 3);
 			}
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
+			else {
+				lastNum = num;
+				lastCount = 1;
 			}
-		} finally {
-			timerService.shutdownService();
 		}
+
+		testHarness.close();
 	}
 
 	@Test
 	public void testTumblingWindowSingleElements() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -384,66 +342,46 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							validatingIdentityFunction, identitySelector,
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
 
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-			out.waitForNElements(2, 60000);
+			testHarness.open();
 
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(3));
-				op.processElement(new StreamRecord<Integer>(4));
-				op.processElement(new StreamRecord<Integer>(5));
-			}
-			out.waitForNElements(5, 60000);
+			testHarness.setProcessingTime(0);
 
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(6));
-			}
-			out.waitForNElements(6, 60000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
 
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+			testHarness.setProcessingTime(50);
 
-			timerService.quiesceAndAwaitPending();
+			testHarness.processElement(new StreamRecord<>(3));
+			testHarness.processElement(new StreamRecord<>(4));
+			testHarness.processElement(new StreamRecord<>(5));
 
-			synchronized (lock) {
-				op.close();
-			}
+			testHarness.setProcessingTime(100);
 
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
+			testHarness.processElement(new StreamRecord<>(6));
 
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
+			testHarness.setProcessingTime(200);
+
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+			assertEquals(6, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 	
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-			new ReferenceSettingExceptionHandler(error), lock);
-
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -451,44 +389,33 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							validatingIdentityFunction, identitySelector,
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
 
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
+			testHarness.setProcessingTime(0);
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Integer> result = out.getElements();
 			assertEquals(6, result.size());
 			
 			Collections.sort(result);
 			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
 
-			timerService.quiesceAndAwaitPending();
-
-			synchronized (lock) {
-				op.close();
-			}
-
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
@@ -503,15 +430,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
 			testHarness.setup();
 			testHarness.open();
 
-			timerService.setCurrentTime(0);
+			testHarness.setProcessingTime(0);
 
 			// inject some elements
 			final int numElementsFirst = 700;
@@ -542,8 +467,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			timerService = new TestProcessingTimeService();
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -554,7 +478,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(new StreamRecord<>(i));
 			}
 
-			timerService.setCurrentTime(400);
+			testHarness.setProcessingTime(400);
 
 			// get and verify the result
 			List<Integer> finalResult = new ArrayList<>();
@@ -568,7 +492,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				assertEquals(i, finalResult.get(i).intValue());
 			}
 			testHarness.close();
-			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -583,8 +506,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			// sliding window (200 msecs) every 50 msecs
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
@@ -593,9 +514,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSlide);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
-			timerService.setCurrentTime(0);
+			testHarness.setProcessingTime(0);
 
 			testHarness.setup();
 			testHarness.open();
@@ -623,7 +544,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}
 
 			testHarness.close();
-			op.dispose();
 
 			// re-create the operator and restore the state
 			op = new AccumulatingProcessingTimeWindowOperator<>(
@@ -631,8 +551,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 					windowSize, windowSlide);
 
-			timerService = new TestProcessingTimeService();
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -644,13 +563,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(new StreamRecord<>(i));
 			}
 
-			timerService.setCurrentTime(50);
-			timerService.setCurrentTime(100);
-			timerService.setCurrentTime(150);
-			timerService.setCurrentTime(200);
-			timerService.setCurrentTime(250);
-			timerService.setCurrentTime(300);
-			timerService.setCurrentTime(350);
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+			testHarness.setProcessingTime(200);
+			testHarness.setProcessingTime(250);
+			testHarness.setProcessingTime(300);
+			testHarness.setProcessingTime(350);
 
 			// get and verify the result
 			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -684,14 +603,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							new StatefulFunction(), identitySelector,
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+					new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 			testHarness.open();
 
-			timerService.setCurrentTime(0);
+			testHarness.setProcessingTime(0);
 
 			testHarness.processElement(new StreamRecord<>(1));
 			testHarness.processElement(new StreamRecord<>(2));
@@ -703,7 +620,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			op.processElement(new StreamRecord<>(2));
 			op.processElement(new StreamRecord<>(2));
 
-			timerService.setCurrentTime(1000);
+			testHarness.setProcessingTime(1000);
 
 			List<Integer> result = extractFromStreamRecords(testHarness.getOutput());
 			assertEquals(8, result.size());
@@ -808,7 +725,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+	private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
 		List<T> result = new ArrayList<>();
 		for (Object in : input) {
 			if (in instanceof StreamRecord) {
@@ -824,5 +741,5 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		while (!timers.isTerminated()) {
 			Thread.sleep(2);
 		}
-	} 
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index bb64a08..7ca5753 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.ValueState;
@@ -32,24 +31,13 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
 import org.junit.After;
 import org.junit.Test;
 
@@ -57,21 +45,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings("serial")
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
@@ -79,23 +64,23 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
-	private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = 
+
+	private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
 			new KeySelector<Tuple2<Integer,Integer>, Integer>() {
 				@Override
 				public Integer getKey(Tuple2<Integer,Integer> value) {
 					return value.f0;
 				}
 	};
-	
+
 	private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() {
 		@Override
 		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) {
 			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	};
-	
-	private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = 
+
+	private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer =
 			new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)
 					.createSerializer(new ExecutionConfig());
 
@@ -107,14 +92,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			return diff0 != 0 ? diff0 : diff1;
 		}
 	};
-	
+
 	// ------------------------------------------------------------------------
 
 	public AggregatingAlignedProcessingTimeWindowOperatorTest() {
 		ClosureCleaner.clean(fieldOneSelector, false);
 		ClosureCleaner.clean(sumFunction, false);
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	@After
@@ -131,9 +116,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		assertTrue("Not all trigger threads where properly shut down",
 				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	@Test
 	public void testInvalidParameters() {
 		try {
@@ -141,7 +126,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertInvalidParameter(10000L, -1L);
 			assertInvalidParameter(-1L, 1000L);
 			assertInvalidParameter(1000L, 2000L);
-			
+
 			// actual internal slide is too low here:
 			assertInvalidParameter(1000L, 999L);
 		}
@@ -150,12 +135,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
 			AggregatingProcessingTimeWindowOperator<String, String> op;
-			
+
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -193,44 +178,51 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
 		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final ProcessingTimeService timerService = new NoOpTimerService();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
-			AggregatingProcessingTimeWindowOperator<String, String> op;
+			AggregatingProcessingTimeWindowOperator<String, String> op =
+					new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+						StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
+
+			KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.open();
 
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut);
-			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.open();
+
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.open();
+
 			assertTrue(op.getNextSlideTime() % 500 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
+			testHarness.close();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
+
+			testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.open();
+
 			assertTrue(op.getNextSlideTime() % 100 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -240,85 +232,54 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testTumblingWindowUniqueElements() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final int windowSize = 50;
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
-			
+
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							sumFunction, fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
-			op.open();
+			testHarness.open();
 
 			final int numElements = 1000;
 
+			long currentTime = 0;
+
 			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
+				currentTime = currentTime + 10;
+				testHarness.setProcessingTime(currentTime);
 			}
 
-			out.waitForNElements(numElements, 60_000);
-
 			// get and verify the result
-			List<Tuple2<Integer, Integer>> result = out.getElements();
+			List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(numElements, result.size());
 
-			timerService.quiesceAndAwaitPending();
-
-			synchronized (lock) {
-				op.close();
-			}
-
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
-
+			testHarness.close();
 
 			Collections.sort(result, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, result.get(i).f0.intValue());
 				assertEquals(i, result.get(i).f1.intValue());
 			}
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testTumblingWindowDuplicateElements() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
 		try {
 			final int windowSize = 50;
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
-
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -326,43 +287,39 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.setProcessingTime(0);
+			testHarness.open();
 
 			final int numWindows = 10;
 
 			long previousNextTime = 0;
 			int window = 1;
-			
-			while (window <= numWindows) {
-				synchronized (lock) {
-					long nextTime = op.getNextEvaluationTime();
-					int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
-					StreamRecord<Tuple2<Integer, Integer>> next =  new StreamRecord<>(new Tuple2<>(val, val));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-
-					if (nextTime != previousNextTime) {
-						window++;
-						previousNextTime = nextTime;
-					}
-				}
-				Thread.sleep(1);
-			}
 
-			out.waitForNElements(numWindows, 60_000);
+			long currentTime = 0;
 
-			List<Tuple2<Integer, Integer>> result = out.getElements();
+			while (window <= numWindows) {
+				long nextTime = op.getNextEvaluationTime();
+				int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
 
-			timerService.quiesceAndAwaitPending();
+				StreamRecord<Tuple2<Integer, Integer>> next =  new StreamRecord<>(new Tuple2<>(val, val));
+				testHarness.processElement(next);
 
-			synchronized (lock) {
-				op.close();
+				if (nextTime != previousNextTime) {
+					window++;
+					previousNextTime = nextTime;
+				}
+				currentTime = currentTime + 1;
+				testHarness.setProcessingTime(currentTime);
 			}
 
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
+			testHarness.setProcessingTime(currentTime + 100);
+
+			List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+			testHarness.close();
 
 			// we have ideally one element per window. we may have more, when we emitted a value into the
 			// successive window (corner case), so we can have twice the number of elements, in the worst case.
@@ -371,33 +328,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// deduplicate for more accurate checks
 			HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
 			assertTrue(set.size() == 10);
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testSlidingWindow() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
-
 		try {
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -405,32 +345,27 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							150, 50);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
 
 			final int numElements = 1000;
 
+			long currentTime = 0;
+
 			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
+				currentTime = currentTime + 1;
+				testHarness.setProcessingTime(currentTime);
 			}
 
-			timerService.quiesceAndAwaitPending();
-
-			synchronized (lock) {
-				op.close();
-			}
+			// get and verify the result
+			List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
+			testHarness.close();
 
-			// get and verify the result
-			List<Tuple2<Integer, Integer>> result = out.getElements();
-			
 			// every element can occur between one and three times
 			if (result.size() < numElements || result.size() > 3 * numElements) {
 				System.out.println(result);
@@ -440,10 +375,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			Collections.sort(result, tupleComparator);
 			int lastNum = -1;
 			int lastCount = -1;
-			
+
 			for (Tuple2<Integer, Integer> val : result) {
 				assertEquals(val.f0, val.f1);
-				
+
 				if (val.f0 == lastNum) {
 					lastCount++;
 					assertTrue(lastCount <= 3);
@@ -453,58 +388,42 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					lastCount = 1;
 				}
 			}
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
-
 		try {
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							sumFunction, fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer, 150, 50);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
-			op.open();
-
-			synchronized (lock) {
-				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
-				op.setKeyContextElement1(next1);
-				op.processElement(next1);
-				
-				StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
-				op.setKeyContextElement1(next2);
-				op.processElement(next2);
-			}
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
+
+			testHarness.setProcessingTime(0);
+
+			StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
+			testHarness.processElement(next1);
 
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Tuple2<Integer, Integer>> result = out.getElements();
+			StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
+			testHarness.processElement(next2);
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+
+			List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(6, result.size());
-			
+
 			Collections.sort(result, tupleComparator);
 			assertEquals(Arrays.asList(
 					new Tuple2<>(1, 1),
@@ -515,40 +434,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					new Tuple2<>(2, 2)
 			), result);
 
-			timerService.quiesceAndAwaitPending();
-
-			synchronized (lock) {
-				op.close();
-			}
-
-			shutdownTimerServiceAndWait(timerService);
-			op.dispose();
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testPropagateExceptionsFromProcessElement() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
 			ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
 
 			// the operator has a window time that is so long that it will not fire in this test
@@ -559,46 +456,31 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							hundredYears, hundredYears);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+					new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
 
 			for (int i = 0; i < 100; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
+				testHarness.processElement(next);
 			}
-			
+
 			try {
 				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
-				op.setKeyContextElement1(next);
-				op.processElement(next);
+				testHarness.processElement(next);
 				fail("This fail with an exception");
 			}
 			catch (Exception e) {
 				assertTrue(e.getMessage().contains("Artificial Test Exception"));
 			}
 
-			timerService.quiesceAndAwaitPending();
-			synchronized (lock) {
-				op.close();
-			}
-
-			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
-
-			if (error.get() != null) {
-				throw new Exception(error.get());
-			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
@@ -606,8 +488,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			final int windowSize = 200;
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			// tumbling window that triggers every 50 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -616,9 +496,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
-			timerService.setCurrentTime(0);
+			testHarness.setProcessingTime(0);
 
 			testHarness.setup();
 			testHarness.open();
@@ -626,7 +506,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// inject some elements
 			final int numElementsFirst = 700;
 			final int numElements = 1000;
-			
+
 			for (int i = 0; i < numElementsFirst; i++) {
 				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
 				testHarness.processElement(next);
@@ -656,8 +536,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
-			timerService = new TestProcessingTimeService();
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -669,7 +548,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next);
 			}
 
-			timerService.setCurrentTime(200);
+			testHarness.setProcessingTime(200);
 
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -699,8 +578,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			// sliding window (200 msecs) every 50 msecs
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -708,10 +585,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSlide);
 
-			timerService.setCurrentTime(0);
 
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+
+			testHarness.setProcessingTime(0);
 
 			testHarness.setup();
 			testHarness.open();
@@ -749,8 +627,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
-			timerService = new TestProcessingTimeService();
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -762,14 +639,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next);
 			}
 
-			timerService.setCurrentTime(50);
-			timerService.setCurrentTime(100);
-			timerService.setCurrentTime(150);
-			timerService.setCurrentTime(200);
-			timerService.setCurrentTime(250);
-			timerService.setCurrentTime(300);
-			timerService.setCurrentTime(350);
-			timerService.setCurrentTime(400);
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+			testHarness.setProcessingTime(200);
+			testHarness.setProcessingTime(250);
+			testHarness.setProcessingTime(300);
+			testHarness.setProcessingTime(350);
+			testHarness.setProcessingTime(400);
 
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -796,11 +673,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void testKeyValueStateInWindowFunctionTumbling() {
 		try {
 			final long twoSeconds = 2000;
-			
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			StatefulFunction.globalCounts.clear();
-			
+
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							new StatefulFunction(), fieldOneSelector,
@@ -809,16 +684,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
 					op,
 					new ExecutionConfig(),
-					timerService,
 					fieldOneSelector,
 					BasicTypeInfo.INT_TYPE_INFO);
 
-			timerService.setCurrentTime(0);
+			testHarness.setProcessingTime(0);
 			testHarness.open();
 
 			// because the window interval is so large, everything should be in one window
 			// and aggregate into one value per key
-			
+
 			for (int i = 0; i < 10; i++) {
 				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
 				testHarness.processElement(next1);
@@ -827,7 +701,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next2);
 			}
 
-			timerService.setCurrentTime(1000);
+			testHarness.setProcessingTime(1000);
 
 			int count1 = StatefulFunction.globalCounts.get(1);
 			int count2 = StatefulFunction.globalCounts.get(2);
@@ -851,32 +725,30 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestProcessingTimeService timerService = new TestProcessingTimeService();
-
 			StatefulFunction.globalCounts.clear();
-			
+
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							new StatefulFunction(), fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide);
 
-			timerService.setCurrentTime(0);
 			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
 					op,
 					new ExecutionConfig(),
-					timerService,
 					fieldOneSelector,
 					BasicTypeInfo.INT_TYPE_INFO);
 
+			testHarness.setProcessingTime(0);
+
 			testHarness.open();
 
 			// because the window interval is so large, everything should be in one window
 			// and aggregate into one value per key
 			final int numElements = 100;
-			
+
 			// because we do not release the lock here, these elements
 			for (int i = 0; i < numElements; i++) {
-				
+
 				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
 				StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
 				StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i));
@@ -888,14 +760,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next4);
 			}
 
-			timerService.setCurrentTime(50);
-			timerService.setCurrentTime(100);
-			timerService.setCurrentTime(150);
-			timerService.setCurrentTime(200);
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+			testHarness.setProcessingTime(200);
 
 			int count1 = StatefulFunction.globalCounts.get(1);
 			int count2 = StatefulFunction.globalCounts.get(2);
-			
+
 			assertTrue(count1 >= 2 && count1 <= 2 * numElements);
 			assertEquals(count1, count2);
 
@@ -907,12 +779,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	private void assertInvalidParameter(long windowSize, long windowSlide) {
 		try {
-			new AggregatingProcessingTimeWindowOperator<String, String>(
+			new AggregatingProcessingTimeWindowOperator<>(
 					mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE,
 					windowSize, windowSlide);
@@ -927,11 +799,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
 
 		private final int failAfterElements;
-		
+
 		private int numElements;
 
 		FailingFunction(int failAfterElements) {
@@ -945,7 +817,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			if (numElements >= failAfterElements) {
 				throw new Exception("Artificial Test Exception");
 			}
-			
+
 			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	}
@@ -961,7 +833,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		@Override
 		public void open(Configuration parameters) {
 			assertNotNull(getRuntimeContext());
-			
+
 			// start with one, so the final count is correct and we test that we do not
 			// initialize with 0 always by default
 			state = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount", Integer.class, 1));
@@ -971,44 +843,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
 			state.update(state.value() + 1);
 			globalCounts.put(value1.f0, state.value());
-			
+
 			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	}
 
 	// ------------------------------------------------------------------------
-	
-	private static StreamTask<?, ?> createMockTask() {
-		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
-
-		StreamTask<?, ?> task = mock(StreamTask.class);
-		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
-		when(task.getName()).thenReturn("Test task name");
-		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
-		final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
-		when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
-
-		final Environment env = new DummyEnvironment("Test task name", 1, 0);
-		when(task.getEnvironment()).thenReturn(env);
-
-		return task;
-	}
-
-	private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService)
-	{
-		StreamTask<?, ?> mockTask = createMockTask();
-		when(mockTask.getProcessingTimeService()).thenReturn(timerService);
-		return mockTask;
-	}
-
-	private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
-		return new StreamConfig(new Configuration());
-	}
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+	private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
 		List<T> result = new ArrayList<>();
 		for (Object in : input) {
 			if (in instanceof StreamRecord) {
@@ -1017,12 +860,4 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		}
 		return result;
 	}
-
-	private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
-		timers.shutdownService();
-
-		while (!timers.isTerminated()) {
-			Thread.sleep(2);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
deleted file mode 100644
index 42be131..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-	
-	private final List<T> elements = new ArrayList<>();
-
-	private final int timeStampModulus;
-
-
-	public CollectingOutput() {
-		this.timeStampModulus = 0;
-	}
-	
-	public CollectingOutput(int timeStampModulus) {
-		this.timeStampModulus = timeStampModulus;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public List<T> getElements() {
-		return elements;
-	}
-	
-	public void waitForNElements(int n, long timeout) throws InterruptedException {
-		long deadline = System.currentTimeMillis() + timeout;
-		synchronized (elements) {
-			long now;
-			while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) {
-				elements.wait(deadline - now);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void emitWatermark(Watermark mark) {
-		throw new UnsupportedOperationException("The output should not emit watermarks");
-	}
-
-	@Override
-	public void emitLatencyMarker(LatencyMarker latencyMarker) {
-		throw new UnsupportedOperationException("The output should not emit latency markers");
-	}
-
-	@Override
-	public void collect(StreamRecord<T> record) {
-		elements.add(record.getValue());
-		
-		if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
-			throw new IllegalArgumentException("Invalid timestamp");
-		}
-		synchronized (elements) {
-			elements.notifyAll();
-		}
-	}
-
-	@Override
-	public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
deleted file mode 100644
index a7a71cf..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends ProcessingTimeService {
-
-	private volatile boolean terminated;
-
-	@Override
-	public long getCurrentProcessingTime() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
-		return null;
-	}
-
-	@Override
-	public boolean isTerminated() {
-		return terminated;
-	}
-
-	@Override
-	public void quiesceAndAwaitPending() {}
-
-	@Override
-	public void shutdownService() {
-		terminated = true;
-	}
-}


Mime
View raw message