flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/9] flink git commit: [FLINK-4924] Simplify Operator Test Harness Constructors
Date Wed, 26 Oct 2016 21:36:50 GMT
[FLINK-4924] Simplify Operator Test Harness Constructors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe1654c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe1654c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe1654c6

Branch: refs/heads/master
Commit: fe1654c680cad692e19ce262c402fd9756e8602a
Parents: de03e0c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Oct 26 12:19:25 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |  9 ++---
 .../fs/bucketing/BucketingSinkTest.java         |  2 +-
 .../flink/streaming/api/graph/StreamConfig.java | 28 ---------------
 .../api/graph/StreamingJobGraphGenerator.java   |  5 ---
 .../api/operators/AbstractStreamOperator.java   |  2 +-
 .../StreamOperatorSnapshotRestoreTest.java      | 38 ++++++++++++--------
 ...stampsAndPeriodicWatermarksOperatorTest.java | 14 ++++----
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++---
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++---
 .../operators/windowing/WindowOperatorTest.java | 10 ++----
 .../windowing/WindowingTestHarnessTest.java     |  4 ---
 .../tasks/OneInputStreamTaskTestHarness.java    |  5 +--
 .../runtime/tasks/StreamMockEnvironment.java    |  7 +++-
 .../util/AbstractStreamOperatorTestHarness.java | 16 ++++-----
 .../KeyedOneInputStreamOperatorTestHarness.java | 23 +++++-------
 .../KeyedTwoInputStreamOperatorTestHarness.java | 27 +++++---------
 .../util/OneInputStreamOperatorTestHarness.java |  7 ++--
 .../util/TwoInputStreamOperatorTestHarness.java |  9 ++---
 .../streaming/util/WindowingTestHarness.java    | 11 +++---
 19 files changed, 95 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 56d8efc..198a621 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -120,14 +120,15 @@ public class ContinuousFileMonitoringTest {
 		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		final long watermarkInterval = 10;
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setAutoWatermarkInterval(watermarkInterval);
 
 		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-		reader.setOutputType(typeInfo, executionConfig);
+		reader.setOutputType(typeInfo, new ExecutionConfig());
 
 		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
-			new OneInputStreamOperatorTestHarness<>(reader, executionConfig);
+			new OneInputStreamOperatorTestHarness<>(reader);
+
+		tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
+
 
 		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index f4b3cd7..e4b0460 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -90,7 +90,7 @@ public class BucketingSinkTest {
 
 	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
 			BucketingSink<T> sink) throws Exception {
-		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new
ExecutionConfig());
+		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
 	}
 
 	@BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index ffe8456..2d38fb9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -76,8 +76,6 @@ public class StreamConfig implements Serializable {
 	private static final String STATE_BACKEND = "statebackend";
 	private static final String STATE_PARTITIONER = "statePartitioner";
 
-	private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups";
-
 	private static final String STATE_KEY_SERIALIZER = "statekeyser";
 	
 	private static final String TIME_CHARACTERISTIC = "timechar";
@@ -450,32 +448,6 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	/**
-	 * Sets the number of key-groups to be used for the current {@link StreamOperator}.
-	 *
-	 * @param numberOfKeyGroups Number of key-groups to be used
-	 */
-	public void setNumberOfKeyGroups(int numberOfKeyGroups) {
-		try {
-			InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, NUMBER_OF_KEY_GROUPS);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not serialize virtual state partitioner.", e);
-		}
-	}
-
-	/**
-	 * Gets the number of key-groups for the {@link StreamOperator}.
-	 *
-	 * @return the number of key-groups
-	 */
-	public Integer getNumberOfKeyGroups(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate virtual state partitioner.", e);
-		}
-	}
-	
 	public void setStateKeySerializer(TypeSerializer<?> serializer) {
 		try {
 			InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1d99cf3..87fd7eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -355,11 +355,6 @@ public class StreamingJobGraphGenerator {
 		config.setStatePartitioner(1, vertex.getStatePartitioner2());
 		config.setStateKeySerializer(vertex.getStateKeySerializer());
 
-		// only set the max parallelism if the vertex uses partitioned state (= KeyedStream).
-		if (vertex.getStatePartitioner1() != null) {
-			config.setNumberOfKeyGroups(vertex.getMaxParallelism());
-		}
-
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 
 		if (vertexClass.equals(StreamIterationHead.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5b66466..aa2f584 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT>
 
 				this.keyedStateBackend = container.createKeyedStateBackend(
 						keySerializer,
-						container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
+						container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
 						subTaskKeyGroupRange);
 
 				this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index ada0b86..cc29172 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 
 public class StreamOperatorSnapshotRestoreTest {
 
+	private static final int MAX_PARALLELISM = 10;
+
 	@Test
 	public void testOperatorStatesSnapshotRestore() throws Exception {
 
@@ -57,12 +59,16 @@ public class StreamOperatorSnapshotRestoreTest {
 		TestOneInputStreamOperator op = new TestOneInputStreamOperator(false);
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(op, new KeySelector<Integer, Integer>()
{
-					@Override
-					public Integer getKey(Integer value) throws Exception {
-						return value;
-					}
-				}, TypeInformation.of(Integer.class));
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						op,
+						new KeySelector<Integer, Integer>() {
+							@Override
+							public Integer getKey(Integer value) throws Exception {
+								return value;
+							}
+						},
+						TypeInformation.of(Integer.class),
+						MAX_PARALLELISM);
 
 		testHarness.open();
 
@@ -87,12 +93,16 @@ public class StreamOperatorSnapshotRestoreTest {
 		//-------------------------------------------------------------------------- restore
 
 		op = new TestOneInputStreamOperator(true);
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(op, new KeySelector<Integer,
Integer>() {
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		}, TypeInformation.of(Integer.class));
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
+				op,
+				new KeySelector<Integer, Integer>() {
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				},
+				TypeInformation.of(Integer.class),
+				MAX_PARALLELISM);
 
 		testHarness.initializeState(new OperatorStateHandles(
 				0,
@@ -159,7 +169,7 @@ public class StreamOperatorSnapshotRestoreTest {
 				++count;
 			}
 
-			Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, count);
+			Assert.assertEquals(MAX_PARALLELISM, count);
 
 			// write raw operator state that goes into snapshot
 			OperatorStateCheckpointOutputStream outOp = context.getRawOperatorStateOutput();
@@ -188,7 +198,7 @@ public class StreamOperatorSnapshotRestoreTest {
 						++count;
 					}
 				}
-				Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, count);
+				Assert.assertEquals(MAX_PARALLELISM, count);
 
 				// check restored managed operator state
 				BitSet check = new BitSet(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index febfcde..f84836b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -40,11 +40,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		final TimestampsAndPeriodicWatermarksOperator<Long> operator = 
 				new TimestampsAndPeriodicWatermarksOperator<Long>(new LongExtractor());
 
-		final ExecutionConfig config = new ExecutionConfig();
-		config.setAutoWatermarkInterval(50);
-
 		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
 
 		long currentTime = 0;
 
@@ -126,11 +125,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		final TimestampsAndPeriodicWatermarksOperator<Long> operator =
 				new TimestampsAndPeriodicWatermarksOperator<Long>(assigner);
 
-		final ExecutionConfig config = new ExecutionConfig();
-		config.setAutoWatermarkInterval(50);
-
 		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
+				new OneInputStreamOperatorTestHarness<Long, Long>(operator);
+
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 720258e..34f69f8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -431,7 +431,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+					new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setup();
 			testHarness.open();
@@ -467,7 +467,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -514,7 +514,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSlide);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+					new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setProcessingTime(0);
 
@@ -551,7 +551,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 					windowSize, windowSlide);
 
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -604,7 +604,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-					new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), identitySelector,
BasicTypeInfo.INT_TYPE_INFO);
+					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 			testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ca5753..1875bbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -496,7 +496,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+					new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setProcessingTime(0);
 
@@ -536,7 +536,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -587,7 +587,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+					new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setProcessingTime(0);
 
@@ -627,7 +627,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
-			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -683,7 +683,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
 					op,
-					new ExecutionConfig(),
 					fieldOneSelector,
 					BasicTypeInfo.INT_TYPE_INFO);
 
@@ -734,7 +733,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
 					op,
-					new ExecutionConfig(),
 					fieldOneSelector,
 					BasicTypeInfo.INT_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9e50aaa..e5a5e21 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -932,7 +932,7 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -989,7 +989,7 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1059,7 +1059,7 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -2471,7 +2471,6 @@ public class WindowOperatorTest extends TestLogger {
 		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,
@@ -2520,7 +2519,6 @@ public class WindowOperatorTest extends TestLogger {
 		SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,
@@ -2552,7 +2550,6 @@ public class WindowOperatorTest extends TestLogger {
 			Time.milliseconds(OFFSET));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,
@@ -2607,7 +2604,6 @@ public class WindowOperatorTest extends TestLogger {
 			Time.milliseconds(SLIDING),Time.milliseconds(OFFSET));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
index 58a7897..8e33c92 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -45,7 +44,6 @@ public class WindowingTestHarnessTest {
 		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,
@@ -92,7 +90,6 @@ public class WindowingTestHarnessTest {
 		TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,
@@ -145,7 +142,6 @@ public class WindowingTestHarnessTest {
 		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
 		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
-			new ExecutionConfig(),
 			windowAssigner,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 5573a53..3cf055e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -101,11 +101,12 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 		streamConfig.setTypeSerializerIn1(inputSerializer);
 	}
 
-	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K>
keyType) {
+	public <K> void configureForKeyedStream(
+			KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType) {
 		ClosureCleaner.clean(keySelector, false);
 		streamConfig.setStatePartitioner(0, keySelector);
 		streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		streamConfig.setNumberOfKeyGroups(10);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 36ecf59..2376a60 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -103,7 +103,12 @@ public class StreamMockEnvironment implements Environment {
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig
executionConfig,
 									long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
-		this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
+		this.taskInfo = new TaskInfo(
+				"", /* task name */
+				1, /* num key groups / max parallelism */
+				0, /* index of this subtask */
+				1, /* num subtasks */
+				0 /* attempt number */);
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index a61d995..1124fa9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -63,7 +63,7 @@ import static org.mockito.Mockito.*;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-	public static final int MAX_PARALLELISM = 10;
+	protected final static int DEFAULT_MAX_PARALLELISM = 1;
 
 	final protected StreamOperator<OUT> operator;
 
@@ -92,19 +92,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	private volatile boolean wasFailedExternally = false;
 
-	public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) throws Exception
{
-		this(operator, new ExecutionConfig());
-	}
-
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
-			ExecutionConfig executionConfig) throws Exception {
+			int maxParallelism) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		Configuration underlyingConfig = new Configuration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
-		this.executionConfig = executionConfig;
+		this.executionConfig = new ExecutionConfig();
 		this.closableRegistry = new ClosableRegistry();
 		this.checkpointLock = new Object();
 
@@ -115,7 +111,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 				1024,
 				underlyingConfig,
 				executionConfig,
-				MAX_PARALLELISM,
+				maxParallelism,
 				1, 0);
 
 		mockTask = mock(StreamTask.class);
@@ -192,6 +188,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		return this.mockTask.getEnvironment();
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	/**
 	 * Get all the output from the task. This contains StreamRecords and Events interleaved.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 99527e7..25563a3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -65,29 +64,23 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	public KeyedOneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			final KeySelector<IN, K> keySelector,
-			TypeInformation<K> keyType) throws Exception {
-		super(operator);
+			TypeInformation<K> keyType,
+			int maxParallelism) throws Exception {
+		super(operator, maxParallelism);
 
 		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
 		setupMockTaskCreateKeyedBackend();
 	}
 
-	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig,
-			KeySelector<IN, K> keySelector,
-			TypeInformation<K> keyType) throws Exception {
-		super(operator, executionConfig);
-
-		ClosureCleaner.clean(keySelector, false);
-		config.setStatePartitioner(0, keySelector);
-		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
-		setupMockTaskCreateKeyedBackend();
+	public KeyedOneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			final KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType) throws Exception {
+		this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM);
 	}
 
 	private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 2e9885c..1a01ea3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -55,37 +54,27 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 
 	public KeyedTwoInputStreamOperatorTestHarness(
 			TwoInputStreamOperator<IN1, IN2, OUT> operator,
-			final KeySelector<IN1, K> keySelector1,
-			final KeySelector<IN2, K> keySelector2,
-			TypeInformation<K> keyType) throws Exception {
-		super(operator);
+			KeySelector<IN1, K> keySelector1,
+			KeySelector<IN2, K> keySelector2,
+			TypeInformation<K> keyType,
+			int maxParallelism) throws Exception {
+		super(operator, maxParallelism);
 
 		ClosureCleaner.clean(keySelector1, false);
 		ClosureCleaner.clean(keySelector2, false);
 		config.setStatePartitioner(0, keySelector1);
 		config.setStatePartitioner(1, keySelector2);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
 		setupMockTaskCreateKeyedBackend();
 	}
 
 	public KeyedTwoInputStreamOperatorTestHarness(
 			TwoInputStreamOperator<IN1, IN2, OUT> operator,
-			ExecutionConfig executionConfig,
-			KeySelector<IN1, K> keySelector1,
-			KeySelector<IN2, K> keySelector2,
+			final KeySelector<IN1, K> keySelector1,
+			final KeySelector<IN2, K> keySelector2,
 			TypeInformation<K> keyType) throws Exception {
-		super(operator, executionConfig);
-
-		ClosureCleaner.clean(keySelector1, false);
-		ClosureCleaner.clean(keySelector2, false);
-		config.setStatePartitioner(0, keySelector1);
-		config.setStatePartitioner(1, keySelector2);
-		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setNumberOfKeyGroups(MAX_PARALLELISM);
-
-		setupMockTaskCreateKeyedBackend();
+		this(operator, keySelector1, keySelector2, keyType, DEFAULT_MAX_PARALLELISM);
 	}
 
 	private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a3e095a..8be9c63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,13 +36,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 	private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator)
throws Exception {
-		this(operator, new ExecutionConfig());
+		this(operator, DEFAULT_MAX_PARALLELISM);
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig) throws Exception {
-		super(operator, executionConfig);
+			int maxParallelism) throws Exception {
+		super(operator, maxParallelism);
 
 		this.oneInputOperator = operator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 95eea98..c6f6918 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,11 +35,13 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends
AbstractStr
 	private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
 	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator)
throws Exception {
-		this(operator, new ExecutionConfig());
+		this(operator, DEFAULT_MAX_PARALLELISM);
 	}
 		
-	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator,
ExecutionConfig executionConfig) throws Exception {
-		super(operator, executionConfig);
+	public TwoInputStreamOperatorTestHarness(
+			TwoInputStreamOperator<IN1, IN2, OUT> operator,
+			int maxParallelism) throws Exception {
+		super(operator, maxParallelism);
 
 		this.twoInputOperator = operator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 9a1b512..db3a89c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -55,8 +55,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 
 	private volatile boolean isOpen = false;
 
-	public WindowingTestHarness(ExecutionConfig executionConfig,
-								WindowAssigner<? super IN, W> windowAssigner,
+	public WindowingTestHarness(WindowAssigner<? super IN, W> windowAssigner,
 								TypeInformation<K> keyType,
 								TypeInformation<IN> inputType,
 								KeySelector<IN, K> keySelector,
@@ -64,20 +63,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 								long allowedLateness) throws Exception {
 
 		ListStateDescriptor<IN> windowStateDesc =
-				new ListStateDescriptor<>("window-contents", inputType.createSerializer(executionConfig));
+				new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<K, IN, Iterable<IN>, IN, W> operator =
 			new WindowOperator<>(
 				windowAssigner,
-				windowAssigner.getWindowSerializer(executionConfig),
+				windowAssigner.getWindowSerializer(new ExecutionConfig()),
 				keySelector,
-				keyType.createSerializer(executionConfig),
+				keyType.createSerializer(new ExecutionConfig()),
 				windowStateDesc,
 				new InternalIterableWindowFunction<>(new PassThroughFunction()),
 				trigger,
 				allowedLateness);
 
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig,
keySelector, keyType);
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector,
keyType, 1);
 	}
 
 	/**


Mime
View raw message