flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [06/27] flink git commit: [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
Date Wed, 31 Aug 2016 17:28:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 472dccb..4e7e4d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.After;
 import org.junit.Ignore;
@@ -81,7 +83,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
-@Ignore
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
@@ -553,12 +554,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void checkpointRestoreWithPendingWindowTumbling() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int windowSize = 200;
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
 
 			// tumbling window that triggers every 50 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -568,7 +567,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+
+			timerService.setCurrentTime(0);
 
 			testHarness.setup();
 			testHarness.open();
@@ -578,48 +579,34 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int numElements = 1000;
 			
 			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
-			// draw a snapshot and dispose the window
-			StreamStateHandle state;
-			List<Tuple2<Integer, Integer>> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size();
-				state = testHarness.snapshot(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
-			
+			// draw a snapshot
+			List<Tuple2<Integer, Integer>> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int beforeSnapShot = resultAtSnapshot.size();
+			StreamStateHandle state = testHarness.snapshot(1L, System.currentTimeMillis());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+
 			assertTrue(resultAtSnapshot.size() <= numElementsFirst);
 
 			// inject some random elements, which should not show up in the state
 			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
 			op.dispose();
 
 			// re-create the operator and restore the state
-			final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSize);
 			op = new AggregatingProcessingTimeWindowOperator<>(
 					sumFunction, fieldOneSelector,
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
-			testHarness =
-					new OneInputStreamOperatorTestHarness<>(op);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -627,24 +614,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			// inject the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
-			out2.waitForNElements(numElements - resultAtSnapshot.size(), 60_000);
+			timerService.setCurrentTime(200);
 
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
+			List<Tuple2<Integer, Integer>> partialFinalResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(partialFinalResult);
 			assertEquals(numElements, finalResult.size());
 
-			synchronized (lock) {
-				op.close();
-			}
+			testHarness.close();
 			op.dispose();
 
 			Collections.sort(finalResult, tupleComparator);
@@ -657,22 +639,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdown();
-		}
 	}
 
 	@Test
 	public void checkpointRestoreWithPendingWindowSliding() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int factor = 4;
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSlide);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
 
 			// sliding window (200 msecs) every 50 msecs
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -681,8 +657,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSlide);
 
+			timerService.setCurrentTime(0);
+
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
-					new OneInputStreamOperatorTestHarness<>(op);
+					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
 			testHarness.open();
@@ -692,48 +670,34 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int numElementsFirst = 700;
 
 			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
 			// draw a snapshot
-			StreamStateHandle state;
-			List<Tuple2<Integer, Integer>> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size();
-				state = testHarness.snapshot(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
+			List<Tuple2<Integer, Integer>> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int beforeSnapShot = resultAtSnapshot.size();
+			StreamStateHandle state = testHarness.snapshot(1L, System.currentTimeMillis());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
 
 			assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
 
 			// inject the remaining elements - these should not influence the snapshot
 			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
 			op.dispose();
 
 			// re-create the operator and restore the state
-			final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSlide);
 			op = new AggregatingProcessingTimeWindowOperator<>(
 					sumFunction, fieldOneSelector,
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
-			testHarness =
-					new OneInputStreamOperatorTestHarness<>(op);
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -741,32 +705,27 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			// inject again the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
-					op.setKeyContextElement1(next);
-					op.processElement(next);
-				}
-				Thread.sleep(1);
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+				testHarness.processElement(next);
 			}
 
-			// for a deterministic result, we need to wait until all pending triggers
-			// have fired and emitted their results
-			long deadline = System.currentTimeMillis() + 120000;
-			do {
-				Thread.sleep(20);
-			}
-			while (resultAtSnapshot.size() + out2.getElements().size() < factor * numElements
-					&& System.currentTimeMillis() < deadline);
+			timerService.setCurrentTime(50);
+			timerService.setCurrentTime(100);
+			timerService.setCurrentTime(150);
+			timerService.setCurrentTime(200);
+			timerService.setCurrentTime(250);
+			timerService.setCurrentTime(300);
+			timerService.setCurrentTime(350);
+			timerService.setCurrentTime(400);
 
-			synchronized (lock) {
-				op.close();
-			}
+			testHarness.close();
 			op.dispose();
 
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
-			assertEquals(factor * numElements, finalResult.size());
+			List<Tuple2<Integer, Integer>> partialFinalResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(partialFinalResult);
+			assertEquals(numElements * factor, finalResult.size());
 
 			Collections.sort(finalResult, tupleComparator);
 			for (int i = 0; i < factor * numElements; i++) {
@@ -778,20 +737,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdown();
-		}
 	}
 
 	@Test
 	public void testKeyValueStateInWindowFunctionTumbling() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final long twoSeconds = 2000;
 			
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
 
 			StatefulFunction.globalCounts.clear();
 			
@@ -800,53 +753,52 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							new StatefulFunction(), fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer, twoSeconds, twoSeconds);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, new HashKeyGroupAssigner<Object>(10)), out);
-			op.open();
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
+					op,
+					new ExecutionConfig(),
+					timerService,
+					fieldOneSelector,
+					BasicTypeInfo.INT_TYPE_INFO);
+
+			timerService.setCurrentTime(0);
+			testHarness.open();
 
 			// because the window interval is so large, everything should be in one window
 			// and aggregate into one value per key
 			
-			synchronized (lock) {
-				for (int i = 0; i < 10; i++) {
-					StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
-					op.setKeyContextElement1(next1);
-					op.processElement(next1);
-	
-					StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
-					op.setKeyContextElement1(next2);
-					op.processElement(next2);
-				}
-			}
+			for (int i = 0; i < 10; i++) {
+				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
+				testHarness.processElement(next1);
 
-			while (StatefulFunction.globalCounts.get(1) < 10 ||
-					StatefulFunction.globalCounts.get(2) < 10)
-			{
-				Thread.sleep(50);
+				StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
+				testHarness.processElement(next2);
 			}
-			
-			op.close();
+
+			timerService.setCurrentTime(1000);
+
+			int count1 = StatefulFunction.globalCounts.get(1);
+			int count2 = StatefulFunction.globalCounts.get(2);
+
+			assertTrue(count1 >= 2 && count1 <= 2 * 10);
+			assertEquals(count1, count2);
+
+			testHarness.close();
 			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdown();
-		}
 	}
 
 	@Test
 	public void testKeyValueStateInWindowFunctionSliding() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int factor = 2;
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
-			
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
 
 			StatefulFunction.globalCounts.clear();
 			
@@ -855,8 +807,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							new StatefulFunction(), fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide);
 
-			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, new HashKeyGroupAssigner<Object>(10)), out);
-			op.open();
+			timerService.setCurrentTime(0);
+			KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
+					op,
+					new ExecutionConfig(),
+					timerService,
+					fieldOneSelector,
+					BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
 
 			// because the window interval is so large, everything should be in one window
 			// and aggregate into one value per key
@@ -870,25 +829,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i));
 				StreamRecord<Tuple2<Integer, Integer>> next4 = new StreamRecord<>(new Tuple2<>(2, i));
 
-				// because we do not release the lock between elements, they end up in the same windows
-				synchronized (lock) {
-					op.setKeyContextElement1(next1);
-					op.processElement(next1);
-					op.setKeyContextElement1(next2);
-					op.processElement(next2);
-					op.setKeyContextElement1(next3);
-					op.processElement(next3);
-					op.setKeyContextElement1(next4);
-					op.processElement(next4);
-				}
-
-				Thread.sleep(1);
-			}
-			
-			synchronized (lock) {
-				op.close();
+				testHarness.processElement(next1);
+				testHarness.processElement(next2);
+				testHarness.processElement(next3);
+				testHarness.processElement(next4);
 			}
 
+			timerService.setCurrentTime(50);
+			timerService.setCurrentTime(100);
+			timerService.setCurrentTime(150);
+			timerService.setCurrentTime(200);
+
+			testHarness.close();
+
 			int count1 = StatefulFunction.globalCounts.get(1);
 			int count2 = StatefulFunction.globalCounts.get(2);
 			
@@ -901,9 +854,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdown();
-		}
 	}
 	
 	// ------------------------------------------------------------------------
@@ -991,21 +941,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Environment env = new DummyEnvironment("Test task name", 1, 0);
 		when(task.getEnvironment()).thenReturn(env);
 
-		try {
-			doAnswer(new Answer<AbstractStateBackend>() {
-				@Override
-				public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final String operatorIdentifier = (String) invocationOnMock.getArguments()[0];
-					final TypeSerializer<?> keySerializer = (TypeSerializer<?>) invocationOnMock.getArguments()[1];
-					MemoryStateBackend backend = MemoryStateBackend.create();
-					backend.initializeForJob(env, operatorIdentifier, keySerializer);
-					return backend;
-				}
-			}).when(task).createStateBackend(any(String.class), any(TypeSerializer.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
 		return task;
 	}
 
@@ -1040,9 +975,17 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, KeyGroupAssigner<?> keyGroupAssigner) {
 		StreamConfig cfg = new StreamConfig(new Configuration());
-		cfg.setStatePartitioner(0, partitioner);
-		cfg.setStateKeySerializer(keySerializer);
-		cfg.setKeyGroupAssigner(keyGroupAssigner);
 		return cfg;
 	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+		List<T> result = new ArrayList<>();
+		for (Object in : input) {
+			if (in instanceof StreamRecord) {
+				result.add((T) ((StreamRecord) in).getValue());
+			}
+		}
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index dc71440..681a334 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
@@ -85,9 +86,7 @@ public class EvictingWindowOperatorTest {
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -157,9 +156,7 @@ public class EvictingWindowOperatorTest {
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -227,9 +224,7 @@ public class EvictingWindowOperatorTest {
 		operator.setInputType(inputType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		long initialTime = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index cda6e1e..67a6f55 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.WindowingTestHarness;
@@ -182,9 +183,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(inputType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.setup();
 		testHarness.open();
@@ -220,9 +219,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(inputType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -323,9 +320,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -359,9 +354,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -398,9 +391,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -474,9 +465,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -552,9 +541,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -659,9 +646,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -721,10 +706,8 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -814,10 +797,8 @@ public class WindowOperatorTest extends TestLogger {
 				"Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -837,17 +818,39 @@ public class WindowOperatorTest extends TestLogger {
 
 		// do a snapshot, close and restore again
 		StreamStateHandle snapshot = testHarness.snapshot(0L, 0L);
+
 		testHarness.close();
+
+		ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();
+
+		stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		operator = new WindowOperator<>(
+				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+				0);
+
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+				"Tuple2<String, Integer>"), new ExecutionConfig());
+
 		testHarness.setup();
 		testHarness.restore(snapshot);
 		testHarness.open();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));
 
@@ -858,7 +861,10 @@ public class WindowOperatorTest extends TestLogger {
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		System.out.println("BEFORE GOT: " + outputBeforeClose);
+		System.out.println("GOT: " + testHarness.getOutput());
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
 
 		testHarness.close();
 	}
@@ -887,9 +893,7 @@ public class WindowOperatorTest extends TestLogger {
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -922,9 +926,8 @@ public class WindowOperatorTest extends TestLogger {
 				0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness =
-				new OneInputStreamOperatorTestHarness<>(otherOperator);
+				new KeyedOneInputStreamOperatorTestHarness<>(otherOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-		otherTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		otherOperator.setInputType(inputType, new ExecutionConfig());
 
 		otherTestHarness.setup();
@@ -959,9 +962,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1021,9 +1022,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1096,9 +1095,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1163,9 +1160,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -1226,9 +1221,7 @@ public class WindowOperatorTest extends TestLogger {
 					LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -1295,9 +1288,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -1358,9 +1349,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -1438,9 +1427,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 		
@@ -1532,9 +1519,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 		
@@ -1620,9 +1605,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 
@@ -1708,9 +1691,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 		
@@ -1805,9 +1786,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 		
@@ -1894,9 +1873,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 
@@ -1981,9 +1958,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -2038,9 +2013,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -2087,9 +2060,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -2144,9 +2115,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -2192,9 +2161,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
@@ -2241,9 +2208,7 @@ public class WindowOperatorTest extends TestLogger {
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		testHarness.open();
 
@@ -2294,9 +2259,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1d07bdd..f8b4063 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -129,7 +129,7 @@ public class InterruptSensitiveRestoreTest {
 				new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
 				"test task name",
-				0, 1, 0,
+				1, 0, 1, 0,
 				new Configuration(),
 				taskConfig,
 				SourceStreamTask.class.getName(),
@@ -170,7 +170,7 @@ public class InterruptSensitiveRestoreTest {
 	private static class InterruptLockingStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
 
 		@Override
-		public FSDataInputStream openInputStream() throws Exception {
+		public FSDataInputStream openInputStream() throws IOException {
 			ensureNotClosed();
 			FSDataInputStream is = new FSDataInputStream() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f757943..7ef0080 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -359,14 +360,16 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		streamTask.triggerCheckpoint(checkpointId, checkpointTimestamp);
 
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
-
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);
 
+		env.getCheckpointLatch().await();
+
 		assertEquals(checkpointId, env.getCheckpointId());
 
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
+
 		final OneInputStreamTask<String, String> restoredTask = new OneInputStreamTask<String, String>();
 		restoredTask.setInitialState(env.getState(), env.getKeyGroupStates());
 
@@ -459,9 +462,11 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	private static class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
-		private long checkpointId;
-		private ChainedStateHandle<StreamStateHandle> state;
-		private List<KeyGroupsStateHandle> keyGroupStates;
+		private volatile long checkpointId;
+		private volatile ChainedStateHandle<StreamStateHandle> state;
+		private volatile List<KeyGroupsStateHandle> keyGroupStates;
+
+		private final OneShotLatch checkpointLatch = new OneShotLatch();
 
 		public long getCheckpointId() {
 			return checkpointId;
@@ -494,6 +499,11 @@ public class OneInputStreamTaskTest extends TestLogger {
 			this.checkpointId = checkpointId;
 			this.state = state;
 			this.keyGroupStates = keyGroupStates;
+			checkpointLatch.trigger();
+		}
+
+		public OneShotLatch getCheckpointLatch() {
+			return checkpointLatch;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 5e82569..0901b32 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -103,7 +103,7 @@ public class StreamMockEnvironment implements Environment {
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
 									long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
-		this.taskInfo = new TaskInfo("", 0, 1, 0);
+		this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3d9d50f..408b5b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -237,7 +237,7 @@ public class StreamTaskTest {
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
-				"Test Task", 0, 1, 0,
+				"Test Task", 1, 0, 1, 0,
 				new Configuration(),
 				taskConfig.getConfiguration(),
 				invokable.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..5594193
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
+ * a {@link KeyedStateBackend}.
+ *
+ */
+public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
+		extends OneInputStreamOperatorTestHarness<IN, OUT> {
+
+	// in case the operator creates one we store it here so that we
+	// can snapshot its state
+	private KeyedStateBackend<?> keyedStateBackend = null;
+
+	// when we restore we keep the state here so that we can call restore
+	// when the operator requests the keyed state backend
+	private KeyGroupsStateHandle restoredKeyedState = null;
+
+	public KeyedOneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			final KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType) {
+		super(operator);
+
+		ClosureCleaner.clean(keySelector, false);
+		config.setStatePartitioner(0, keySelector);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(MAX_PARALLELISM));
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
+	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
+			ExecutionConfig executionConfig,
+			KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType) {
+		super(operator, executionConfig);
+
+		ClosureCleaner.clean(keySelector, false);
+		config.setStatePartitioner(0, keySelector);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(MAX_PARALLELISM));
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
+	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
+			ExecutionConfig executionConfig,
+			TimeServiceProvider testTimeProvider,
+			KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType) {
+		super(operator, executionConfig, testTimeProvider);
+
+		ClosureCleaner.clean(keySelector, false);
+		config.setStatePartitioner(0, keySelector);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(MAX_PARALLELISM));
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
+	private void setupMockTaskCreateKeyedBackend() {
+
+		try {
+			doAnswer(new Answer<KeyedStateBackend>() {
+				@Override
+				public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+					final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[0];
+					final KeyGroupAssigner keyGroupAssigner = (KeyGroupAssigner) invocationOnMock.getArguments()[1];
+					final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
+
+					if (restoredKeyedState == null) {
+						keyedStateBackend = stateBackend.createKeyedStateBackend(
+								mockTask.getEnvironment(),
+								new JobID(),
+								"test_op",
+								keySerializer,
+								keyGroupAssigner,
+								keyGroupRange,
+								mockTask.getEnvironment().getTaskKvStateRegistry());
+						return keyedStateBackend;
+					} else {
+						keyedStateBackend = stateBackend.restoreKeyedStateBackend(
+								mockTask.getEnvironment(),
+								new JobID(),
+								"test_op",
+								keySerializer,
+								keyGroupAssigner,
+								keyGroupRange,
+								Collections.singletonList(restoredKeyedState),
+								mockTask.getEnvironment().getTaskKvStateRegistry());
+						restoredKeyedState = null;
+						return keyedStateBackend;
+					}
+				}
+			}).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), any(KeyGroupAssigner.class), any(KeyGroupRange.class));
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
+	 */
+	@Override
+	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
+		// simply use an in-memory handle
+		MemoryStateBackend backend = new MemoryStateBackend();
+
+		CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+				streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+		operator.snapshotState(outStream, checkpointId, timestamp);
+
+		if (keyedStateBackend != null) {
+			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+					timestamp,
+					streamFactory);
+			if(!keyedSnapshotRunnable.isDone()) {
+				Thread runner = new Thread(keyedSnapshotRunnable);
+				runner.start();
+			}
+			outStream.write(1);
+			ObjectOutputStream oos = new ObjectOutputStream(outStream);
+			oos.writeObject(keyedSnapshotRunnable.get());
+			oos.flush();
+		} else {
+			outStream.write(0);
+		}
+		return outStream.closeAndGetHandle();
+	}
+
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
+	 */
+	@Override
+	public void restore(StreamStateHandle snapshot) throws Exception {
+		FSDataInputStream inStream = snapshot.openInputStream();
+		operator.restoreState(inStream);
+
+		byte keyedStatePresent = (byte) inStream.read();
+		if (keyedStatePresent == 1) {
+			ObjectInputStream ois = new ObjectInputStream(inStream);
+			this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject();
+		}
+	}
+
+	/**
+	 * Calls close and dispose on the operator.
+	 */
+	public void close() throws Exception {
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index bc255ff..65ed43d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -78,9 +78,7 @@ public class MockContext<IN, OUT> {
 				KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) throws Exception {
 
 		OneInputStreamOperatorTestHarness<IN, OUT> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(keySelector, keyType);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
 
 		testHarness.setup();
 		testHarness.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6cb46d6..78e05b7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,22 +18,25 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -44,6 +47,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 
@@ -63,6 +67,8 @@ import static org.mockito.Mockito.when;
  */
 public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
+	protected static final int MAX_PARALLELISM = 10;
+
 	final OneInputStreamOperator<IN, OUT> operator;
 
 	final ConcurrentLinkedQueue<Object> outputList;
@@ -78,7 +84,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	StreamTask<?, ?> mockTask;
 
 	// use this as default for tests
-	private AbstractStateBackend stateBackend = new MemoryStateBackend();
+	AbstractStateBackend stateBackend = new MemoryStateBackend();
 
 	/**
 	 * Whether setup() was called on the operator. This is reset when calling close().
@@ -108,7 +114,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		this.executionConfig = executionConfig;
 		this.checkpointLock = new Object();
 
-		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig);
+		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
 		mockTask = mock(StreamTask.class);
 		timeServiceProvider = testTimeProvider;
 
@@ -120,21 +126,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
 
-		try {
-			doAnswer(new Answer<AbstractStateBackend>() {
-				@Override
-				public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final String operatorIdentifier = (String) invocationOnMock.getArguments()[0];
-					final TypeSerializer<?> keySerializer = (TypeSerializer<?>) invocationOnMock.getArguments()[1];
-					OneInputStreamOperatorTestHarness.this.stateBackend.disposeAllStateForCurrentJob();
-					OneInputStreamOperatorTestHarness.this.stateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
-					return OneInputStreamOperatorTestHarness.this.stateBackend;
-				}
-			}).when(mockTask).createStateBackend(any(String.class), any(TypeSerializer.class));
-		} catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
-		
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -153,6 +144,20 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 				return timeServiceProvider.getCurrentProcessingTime();
 			}
 		}).when(mockTask).getCurrentProcessingTime();
+
+		try {
+			doAnswer(new Answer<CheckpointStreamFactory>() {
+				@Override
+				public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+					final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0];
+					return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
+				}
+			}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+
 	}
 
 	public void setStateBackend(AbstractStateBackend stateBackend) {
@@ -167,13 +172,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		return this.mockTask.getEnvironment();
 	}
 
-	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
-		ClosureCleaner.clean(keySelector, false);
-		config.setStatePartitioner(0, keySelector);
-		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(10));
-	}
-
 	/**
 	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
 	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
@@ -205,13 +203,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(FSDataOutputStream, long, long)} ()}
 	 */
 	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
-		// simply use an in-memory handle
-		MemoryStateBackend backend = new MemoryStateBackend();
-		AbstractStateBackend.CheckpointStateOutputStream outStream =
-				backend.createCheckpointStateOutputStream(checkpointId, timestamp);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
+				new JobID(),
+				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
 		operator.snapshotState(outStream, checkpointId, timestamp);
 		return outStream.closeAndGetHandle();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index bc4074f..58e8c6b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.util;
 
+import com.google.common.collect.Iterables;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
@@ -76,8 +77,8 @@ public class TestHarnessUtil {
 	/**
 	 * Compare the two queues containing operator/task output by converting them to an array first.
 	 */
-	public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
-		assertEquals(expected.size(), actual.size());
+	public static void assertOutputEqualsSorted(String message, Iterable<Object> expected, Iterable<Object> actual, Comparator<Object> comparator) {
+		assertEquals(Iterables.size(expected), Iterables.size(actual));
 
 		// first, compare only watermarks, their position should be deterministic
 		Iterator<Object> exIt = expected.iterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 779436a..af1f3fa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -85,8 +85,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 		operator.setInputType(inputType, executionConfig);
 
 		timeServiceProvider = new TestTimeServiceProvider();
-		testHarness = new OneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider);
-		testHarness.configureForKeyedStream(keySelector, keyType);
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 97c8339..9f8ab90 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -73,7 +73,6 @@ import static org.junit.Assert.*;
 @RunWith(Parameterized.class)
 public class EventTimeWindowCheckpointingITCase extends TestLogger {
 
-
 	private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
 	private static final int PARALLELISM = 4;
 
@@ -118,20 +117,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 				this.stateBackend = new FsStateBackend("file://" + backups);
 				break;
 			}
-			case ROCKSDB: {
-				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
-				rdb.setDbStoragePath(rocksDb);
-				this.stateBackend = rdb;
-				break;
-			}
 			case ROCKSDB_FULLY_ASYNC: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
 				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
 				rdb.setDbStoragePath(rocksDb);
-				rdb.enableFullyAsyncSnapshots();
 				this.stateBackend = rdb;
 				break;
 			}
@@ -774,14 +764,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		return Arrays.asList(new Object[][] {
 				{StateBackendEnum.MEM},
 				{StateBackendEnum.FILE},
-				{StateBackendEnum.ROCKSDB},
 				{StateBackendEnum.ROCKSDB_FULLY_ASYNC}
 			}
 		);
 	}
 
 	private enum StateBackendEnum {
-		MEM, FILE, ROCKSDB, ROCKSDB_FULLY_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 3bc3cf5..8b56d3d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -310,7 +310,7 @@ public class ClassLoaderITCase extends TestLogger {
 			// Success :-)
 			LOG.info("Disposed savepoint at " + savepointPath);
 		} else if (disposeResponse instanceof DisposeSavepointFailure) {
-			throw new IllegalStateException("Failed to dispose savepoint");
+			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
 		} else {
 			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index c6a4c7f..8de4797 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.test.classloading.jar;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -56,14 +58,23 @@ public class CustomKvStateProgram {
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());
-		source.keyBy(new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = -9044152404048903826L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return ThreadLocalRandom.current().nextInt(parallelism);
-			}
-		}).flatMap(new ReducingStateFlatMap()).writeAsText(outputPath);
+		source
+				.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+						return new Tuple2<>(ThreadLocalRandom.current().nextInt(parallelism), value);
+					}
+				})
+				.keyBy(new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+						return value.f0;
+					}
+				}).flatMap(new ReducingStateFlatMap()).writeAsText(outputPath);
 
 		env.execute();
 	}
@@ -88,10 +99,10 @@ public class CustomKvStateProgram {
 		}
 	}
 
-	private static class ReducingStateFlatMap extends RichFlatMapFunction<Integer, Integer> {
+	private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> {
 
 		private static final long serialVersionUID = -5939722892793950253L;
-		private ReducingState<Integer> kvState;
+		private transient ReducingState<Integer> kvState;
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -106,11 +117,13 @@ public class CustomKvStateProgram {
 
 
 		@Override
-		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			kvState.add(value);
+		public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+			kvState.add(value.f1);
 		}
 
 		private static class ReduceSum implements ReduceFunction<Integer> {
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public Integer reduce(Integer value1, Integer value2) throws Exception {
 				return value1 + value2;

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
index b3e2137..77a9b2e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.state;
 
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
 import org.apache.flink.runtime.state.StateObject;
 import org.junit.Test;
 
@@ -51,18 +49,6 @@ public class StateHandleSerializationTest {
 			for (Class<?> clazz : stateHandleImplementations) {
 				validataSerialVersionUID(clazz);
 			}
-
-			// check all key/value snapshots
-
-			@SuppressWarnings("unchecked")
-			Set<Class<?>> kvStateSnapshotImplementations = (Set<Class<?>>) (Set<?>)
-					reflections.getSubTypesOf(KvStateSnapshot.class);
-
-			System.out.println(kvStateSnapshotImplementations);
-			
-			for (Class<?> clazz : kvStateSnapshotImplementations) {
-				validataSerialVersionUID(clazz);
-			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 41455cf..d4dd475 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -18,26 +18,28 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+
 import static org.junit.Assert.fail;
 
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -74,7 +76,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 				}
 			})
 			.print();
-		
+
 		try {
 			see.execute();
 			fail();
@@ -89,49 +91,39 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 
 
 	public static class FailingStateBackend extends AbstractStateBackend {
-		
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {
+		public CheckpointStreamFactory createStreamFactory(JobID jobId,
+				String operatorIdentifier) throws IOException {
 			throw new SuccessException();
 		}
 
 		@Override
-		public void disposeAllStateForCurrentJob() throws Exception {}
-
-		@Override
-		public void close() throws Exception {}
-
-		@Override
-		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
-			return null;
-		}
-
-		@Override
-		protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
-			return null;
-		}
-
-		@Override
-		protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
-			return null;
-		}
-
-		@Override
-		protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-			return null;
+		public <K> KeyedStateBackend<K> createKeyedStateBackend(Environment env,
+				JobID jobID,
+				String operatorIdentifier,
+				TypeSerializer<K> keySerializer,
+				KeyGroupAssigner<K> keyGroupAssigner,
+				KeyGroupRange keyGroupRange,
+				TaskKvStateRegistry kvStateRegistry) throws Exception {
+			throw new SuccessException();
 		}
 
 		@Override
-		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
-			long timestamp) throws Exception {
-			return null;
+		public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env,
+				JobID jobID,
+				String operatorIdentifier,
+				TypeSerializer<K> keySerializer,
+				KeyGroupAssigner<K> keyGroupAssigner,
+				KeyGroupRange keyGroupRange,
+				List<KeyGroupsStateHandle> restoredState,
+				TaskKvStateRegistry kvStateRegistry) throws Exception {
+			throw new SuccessException();
 		}
 	}
 
-	static final class SuccessException extends Exception {
+	static final class SuccessException extends IOException {
 		private static final long serialVersionUID = -9218191172606739598L;
 	}
 


Mime
View raw message