flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [04/11] flink git commit: [FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
Date Thu, 24 Aug 2017 18:22:35 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 9f19064..d7df479 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -84,7 +83,7 @@ public class WindowOperatorMigrationTest {
 
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
-		return Arrays.asList(MigrationVersion.v1_1, MigrationVersion.v1_2, MigrationVersion.v1_3);
+		return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3);
 	}
 
 	/**
@@ -753,219 +752,6 @@ public class WindowOperatorMigrationTest {
 		testHarness.close();
 	}
 
-	/**
-	 * Manually run this to write binary snapshot data.
-	 */
-	@Ignore
-	@Test
-	public void writeAggregatingAlignedProcessingTimeWindowsSnapshot() throws Exception {
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
-		AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator =
-			new AggregatingProcessingTimeWindowOperator<>(
-				new ReduceFunction<Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = -8913160567151867987L;
-
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
-						return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
-					}
-				},
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				inputType.createSerializer(new ExecutionConfig()),
-				3000,
-				3000);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.open();
-
-		testHarness.setProcessingTime(3);
-
-		// timestamp is ignored in processing time
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-		OperatorSnapshotUtil.writeStateHandle(
-			snapshot,
-			"src/test/resources/win-op-migration-test-aggr-aligned-flink" + flinkGenerateSavepointVersion + "-snapshot");
-		testHarness.close();
-	}
-
-	@Test
-	public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception {
-		final int windowSize = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */,
-				LegacyWindowOperatorType.FAST_AGGREGATING);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.setup();
-
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-aggr-aligned-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
-
-		testHarness.open();
-
-		testHarness.setProcessingTime(5000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		testHarness.setProcessingTime(7000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Manually run this to write binary snapshot data.
-	 */
-	@Ignore
-	@Test
-	public void writeAlignedProcessingTimeWindowsSnapshot() throws Exception {
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
-		AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>> operator =
-			new AccumulatingProcessingTimeWindowOperator<>(
-					new InternalIterableWindowFunction<>(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
-
-						private static final long serialVersionUID = 6551516443265733803L;
-
-						@Override
-						public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
-							int sum = 0;
-							for (Tuple2<String, Integer> anInput : input) {
-								sum += anInput.f1;
-							}
-							out.collect(new Tuple2<>(s, sum));
-						}
-					}),
-					new TupleKeySelector(),
-					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-					inputType.createSerializer(new ExecutionConfig()),
-					3000,
-					3000);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.open();
-
-		testHarness.setProcessingTime(3);
-
-		// timestamp is ignored in processing time
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-		OperatorSnapshotUtil.writeStateHandle(
-			snapshot,
-			"src/test/resources/win-op-migration-test-accum-aligned-flink" + flinkGenerateSavepointVersion + "-snapshot");
-		testHarness.close();
-	}
-
-	@Test
-	public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception {
-		final int windowSize = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */,
-				LegacyWindowOperatorType.FAST_ACCUMULATING);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.setup();
-
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-accum-aligned-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
-
-		testHarness.open();
-
-		testHarness.setProcessingTime(5000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		testHarness.setProcessingTime(7000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
-
-		testHarness.close();
-	}
-
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 8748ed4..821438e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -63,7 +63,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -336,32 +335,6 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
-
-	/**
-	 * Ignored because we currently don't have the fast processing-time window operator.
-	 */
-	@Test
-	@SuppressWarnings("rawtypes")
-	@Ignore
-	public void testReduceFastProcessingTime() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DataStream<Tuple2<String, Integer>> window = source
-				.keyBy(new TupleKeySelector())
-				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(new DummyReducer());
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
-		Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator);
-
-		processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
-	}
-
 	@Test
 	@SuppressWarnings("rawtypes")
 	public void testReduceWithWindowFunctionEventTime() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 14ae733..f73499c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -75,7 +74,6 @@ import org.junit.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
@@ -106,12 +104,6 @@ public class InterruptSensitiveRestoreTest {
 	private static final int OPERATOR_RAW = 1;
 	private static final int KEYED_MANAGED = 2;
 	private static final int KEYED_RAW = 3;
-	private static final int LEGACY = 4;
-
-	@Test
-	public void testRestoreWithInterruptLegacy() throws Exception {
-		testRestoreWithInterrupt(LEGACY);
-	}
 
 	@Test
 	public void testRestoreWithInterruptOperatorManaged() throws Exception {
@@ -137,18 +129,15 @@ public class InterruptSensitiveRestoreTest {
 
 		IN_RESTORE_LATCH.reset();
 		Configuration taskConfig = new Configuration();
-		StreamConfig streamConfig = new StreamConfig(taskConfig);
-		streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		StreamConfig cfg = new StreamConfig(taskConfig);
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		switch (mode) {
 			case OPERATOR_MANAGED:
 			case OPERATOR_RAW:
 			case KEYED_MANAGED:
 			case KEYED_RAW:
-				streamConfig.setStateKeySerializer(IntSerializer.INSTANCE);
-				streamConfig.setStreamOperator(new StreamSource<>(new TestSource()));
-				break;
-			case LEGACY:
-				streamConfig.setStreamOperator(new StreamSource<>(new TestSourceLegacy()));
+				cfg.setStateKeySerializer(IntSerializer.INSTANCE);
+				cfg.setStreamOperator(new StreamSource<>(new TestSource(mode)));
 				break;
 			default:
 				throw new IllegalArgumentException();
@@ -156,7 +145,7 @@ public class InterruptSensitiveRestoreTest {
 
 		StreamStateHandle lockingHandle = new InterruptLockingStateHandle();
 
-		Task task = createTask(streamConfig, taskConfig, lockingHandle, mode);
+		Task task = createTask(cfg, taskConfig, lockingHandle, mode);
 
 		// start the task and wait until it is in "restore"
 		task.startTaskThread();
@@ -180,16 +169,15 @@ public class InterruptSensitiveRestoreTest {
 	// ------------------------------------------------------------------------
 
 	private static Task createTask(
-		StreamConfig streamConfig,
-		Configuration taskConfig,
-		StreamStateHandle state,
-		int mode) throws IOException {
+			StreamConfig streamConfig,
+			Configuration taskConfig,
+			StreamStateHandle state,
+			int mode) throws IOException {
 
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		StreamStateHandle operatorState = null;
 		Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
 		Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
 		Collection<OperatorStateHandle> operatorStateBackend = Collections.emptyList();
@@ -206,7 +194,7 @@ public class InterruptSensitiveRestoreTest {
 				Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state));
 
 		List<KeyedStateHandle> keyedStateHandles =
-				Collections.<KeyedStateHandle>singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
+				Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
 
 		switch (mode) {
 			case OPERATOR_MANAGED:
@@ -221,15 +209,11 @@ public class InterruptSensitiveRestoreTest {
 			case KEYED_RAW:
 				keyedStateFromStream = keyedStateHandles;
 				break;
-			case LEGACY:
-				operatorState = state;
-				break;
 			default:
 				throw new IllegalArgumentException();
 		}
 
 		OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
-			operatorState,
 			operatorStateBackend,
 			operatorStateStream,
 			keyedStateFromBackend,
@@ -238,14 +222,13 @@ public class InterruptSensitiveRestoreTest {
 		JobVertexID jobVertexID = new JobVertexID();
 		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 		streamConfig.setOperatorID(operatorID);
-
 		TaskStateSnapshot stateSnapshot = new TaskStateSnapshot();
 		stateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),
 			"test job name",
 			new SerializedValue<>(new ExecutionConfig()),
-			taskConfig,
+			new Configuration(),
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList());
 
@@ -302,11 +285,11 @@ public class InterruptSensitiveRestoreTest {
 			FSDataInputStream is = new FSDataInputStream() {
 
 				@Override
-				public void seek(long desired) throws IOException {
+				public void seek(long desired) {
 				}
 
 				@Override
-				public long getPos() throws IOException {
+				public long getPos() {
 					return 0;
 				}
 
@@ -358,33 +341,15 @@ public class InterruptSensitiveRestoreTest {
 
 	// ------------------------------------------------------------------------
 
-	private static class TestSourceLegacy implements SourceFunction<Object>, Checkpointed<Serializable> {
+	private static class TestSource implements SourceFunction<Object>, CheckpointedFunction {
 		private static final long serialVersionUID = 1L;
+		private final int testType;
 
-		@Override
-		public void run(SourceContext<Object> ctx) throws Exception {
-			fail("should never be called");
+		public TestSource(int testType) {
+			this.testType = testType;
 		}
 
 		@Override
-		public void cancel() {}
-
-		@Override
-		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			fail("should never be called");
-			return null;
-		}
-
-		@Override
-		public void restoreState(Serializable state) throws Exception {
-			fail("should never be called");
-		}
-	}
-
-	private static class TestSource implements SourceFunction<Object>, CheckpointedFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
 		public void run(SourceContext<Object> ctx) throws Exception {
 			fail("should never be called");
 		}
@@ -399,6 +364,8 @@ public class InterruptSensitiveRestoreTest {
 
 		@Override
 		public void initializeState(FunctionInitializationContext context) throws Exception {
+			// raw keyed state is already read by timer service, all others to initialize the context...we only need to
+			// trigger this manually.
 			((StateInitializationContext) context).getRawOperatorStateInputs().iterator().next().getStream().read();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 3190620..8d80d66 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -47,21 +45,18 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -69,7 +64,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -78,7 +72,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -93,7 +86,7 @@ import static org.junit.Assert.fail;
 public class OneInputStreamTaskTest extends TestLogger {
 
 	private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR =
-			new ListStateDescriptor<>("test", new IntSerializer());
+		new ListStateDescriptor<>("test", new IntSerializer());
 
 	/**
 	 * This test verifies that open() and close() are correctly called. This test also verifies
@@ -129,8 +122,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 		assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
+			expectedOutput,
+			testHarness.getOutput());
 	}
 
 	/**
@@ -174,8 +167,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.waitForInputProcessing();
 		expectedOutput.add(new Watermark(initialTime));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
+			expectedOutput,
+			testHarness.getOutput());
 
 		// contrary to checkpoint barriers these elements are not blocked by watermarks
 		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
@@ -215,7 +208,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
 		testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
 		testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
-		                                                     // watermark (initial + 6) should be forwarded
+		// watermark (initial + 6) should be forwarded
 		testHarness.waitForInputProcessing();
 		expectedOutput.add(new Watermark(initialTime + 5));
 		expectedOutput.add(new Watermark(initialTime + 6));
@@ -263,21 +256,16 @@ public class OneInputStreamTaskTest extends TestLogger {
 		// ------------------ setup the chain ------------------
 
 		TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
-		OperatorID headOperatorId = new OperatorID();
-
 		StreamConfig headOperatorConfig = testHarness.getStreamConfig();
 
 		WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
-		OperatorID watermarkOperatorId = new OperatorID();
-
 		StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration());
 
 		TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
-		OperatorID tailOperatorId = new OperatorID();
 		StreamConfig tailOperatorConfig = new StreamConfig(new Configuration());
 
 		headOperatorConfig.setStreamOperator(headOperator);
-		headOperatorConfig.setOperatorID(headOperatorId);
+		headOperatorConfig.setOperatorID(new OperatorID(42L, 42L));
 		headOperatorConfig.setChainStart();
 		headOperatorConfig.setChainIndex(0);
 		headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
@@ -290,7 +278,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		)));
 
 		watermarkOperatorConfig.setStreamOperator(watermarkOperator);
-		watermarkOperatorConfig.setOperatorID(watermarkOperatorId);
+		watermarkOperatorConfig.setOperatorID(new OperatorID(4711L, 42L));
 		watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
 		watermarkOperatorConfig.setChainIndex(1);
 		watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
@@ -312,7 +300,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 			null));
 
 		tailOperatorConfig.setStreamOperator(tailOperator);
-		tailOperatorConfig.setOperatorID(tailOperatorId);
+		tailOperatorConfig.setOperatorID(new OperatorID(123L, 123L));
 		tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
 		tailOperatorConfig.setBufferTimeout(0);
 		tailOperatorConfig.setChainIndex(2);
@@ -555,13 +543,11 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		long checkpointId = 1L;
 		long checkpointTimestamp = 1L;
-		long recoveryTimestamp = 3L;
-		long seed = 2L;
 		int numberChainedTasks = 11;
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
-		configureChainedTestingStreamOperator(streamConfig, numberChainedTasks, seed, recoveryTimestamp);
+		configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
 
 		AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
 			testHarness.jobConfig,
@@ -599,7 +585,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
 
-		configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks, seed, recoveryTimestamp);
+		configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
 
 		TaskStateSnapshot stateHandles = env.getCheckpointStateHandles();
 		Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());
@@ -625,16 +611,12 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 	private void configureChainedTestingStreamOperator(
 		StreamConfig streamConfig,
-		int numberChainedTasks,
-		long seed,
-		long recoveryTimestamp) {
+		int numberChainedTasks) {
 
 		Preconditions.checkArgument(numberChainedTasks >= 1, "The operator chain must at least " +
 			"contain one operator.");
 
-		Random random = new Random(seed);
-
-		TestingStreamOperator<Integer, Integer> previousOperator = new TestingStreamOperator<>(random.nextLong(), recoveryTimestamp);
+		TestingStreamOperator<Integer, Integer> previousOperator = new TestingStreamOperator<>();
 		streamConfig.setStreamOperator(previousOperator);
 		streamConfig.setOperatorID(new OperatorID(0L, 0L));
 
@@ -643,7 +625,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		List<StreamEdge> outputEdges = new ArrayList<>(numberChainedTasks - 1);
 
 		for (int chainedIndex = 1; chainedIndex < numberChainedTasks; chainedIndex++) {
-			TestingStreamOperator<Integer, Integer> chainedOperator = new TestingStreamOperator<>(random.nextLong(), recoveryTimestamp);
+			TestingStreamOperator<Integer, Integer> chainedOperator = new TestingStreamOperator<>();
 			StreamConfig chainedConfig = new StreamConfig(new Configuration());
 			chainedConfig.setStreamOperator(chainedOperator);
 			chainedConfig.setOperatorID(new OperatorID(0L, chainedIndex));
@@ -702,17 +684,17 @@ public class OneInputStreamTaskTest extends TestLogger {
 		}
 
 		AcknowledgeStreamMockEnvironment(
-				Configuration jobConfig, Configuration taskConfig,
-				ExecutionConfig executionConfig, long memorySize,
-				MockInputSplitProvider inputSplitProvider, int bufferSize) {
+			Configuration jobConfig, Configuration taskConfig,
+			ExecutionConfig executionConfig, long memorySize,
+			MockInputSplitProvider inputSplitProvider, int bufferSize) {
 			super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
 		}
 
 		@Override
 		public void acknowledgeCheckpoint(
-				long checkpointId,
-				CheckpointMetrics checkpointMetrics,
-				TaskStateSnapshot checkpointStateHandles) {
+			long checkpointId,
+			CheckpointMetrics checkpointMetrics,
+			TaskStateSnapshot checkpointStateHandles) {
 
 			this.checkpointId = checkpointId;
 			this.checkpointStateHandles = checkpointStateHandles;
@@ -729,19 +711,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	private static class TestingStreamOperator<IN, OUT>
-			extends AbstractStreamOperator<OUT>
-			implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
+		extends AbstractStreamOperator<OUT>
+		implements OneInputStreamOperator<IN, OUT> {
 
 		private static final long serialVersionUID = 774614855940397174L;
 
 		public static int numberRestoreCalls = 0;
 		public static int numberSnapshotCalls = 0;
 
-		private final long seed;
-		private final long recoveryTimestamp;
-
-		private transient Random random;
-
 		@Override
 		public void open() throws Exception {
 			super.open();
@@ -767,7 +744,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		@Override
 		public void snapshotState(StateSnapshotContext context) throws Exception {
 			ListState<Integer> partitionableState =
-					getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
+				getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
 			partitionableState.clear();
 
 			partitionableState.add(42);
@@ -778,59 +755,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		@Override
 		public void initializeState(StateInitializationContext context) throws Exception {
-
-		}
-
-		TestingStreamOperator(long seed, long recoveryTimestamp) {
-			this.seed = seed;
-			this.recoveryTimestamp = recoveryTimestamp;
-		}
-
-		@Override
-		public void processElement(StreamRecord<IN> element) throws Exception {
-
-		}
-
-		@Override
-		public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-			if (random == null) {
-				random = new Random(seed);
+			if (context.isRestored()) {
+				++numberRestoreCalls;
 			}
-
-			Serializable functionState = generateFunctionState();
-			Integer operatorState = generateOperatorState();
-
-			InstantiationUtil.serializeObject(out, functionState);
-			InstantiationUtil.serializeObject(out, operatorState);
 		}
 
 		@Override
-		public void restoreState(FSDataInputStream in) throws Exception {
-			numberRestoreCalls++;
-
-			if (random == null) {
-				random = new Random(seed);
-			}
-
-			assertEquals(this.recoveryTimestamp, recoveryTimestamp);
-
-			assertNotNull(in);
-
-			ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
-			Serializable functionState = InstantiationUtil.deserializeObject(in, cl);
-			Integer operatorState = InstantiationUtil.deserializeObject(in, cl);
-
-			assertEquals(random.nextInt(), functionState);
-			assertEquals(random.nextInt(), (int) operatorState);
-		}
-
-		private Serializable generateFunctionState() {
-			return random.nextInt();
-		}
+		public void processElement(StreamRecord<IN> element) throws Exception {
 
-		private Integer generateOperatorState() {
-			return random.nextInt();
 		}
 	}
 
@@ -913,8 +845,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 	 * <p>If it receives a watermark when it's not expecting one, it'll throw an exception and fail.
 	 */
 	private static class TriggerableFailOnWatermarkTestOperator
-			extends AbstractStreamOperator<String>
-			implements OneInputStreamOperator<String, String> {
+		extends AbstractStreamOperator<String>
+		implements OneInputStreamOperator<String, String> {
 
 		private static final long serialVersionUID = 2048954179291813243L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 08c3207..a2dc6c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -81,7 +81,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -139,7 +138,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
@@ -307,9 +305,9 @@ public class StreamTaskTest extends TestLogger {
 		streamTask.setEnvironment(mockEnvironment);
 
 		// mock the operators
-		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
 		// mock the returned snapshots
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
@@ -321,15 +319,6 @@ public class StreamTaskTest extends TestLogger {
 		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
 		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
 
-		// mock the returned legacy snapshots
-		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
-		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
-		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
-
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
-
 		OperatorID operatorID1 = new OperatorID();
 		OperatorID operatorID2 = new OperatorID();
 		OperatorID operatorID3 = new OperatorID();
@@ -359,10 +348,6 @@ public class StreamTaskTest extends TestLogger {
 
 		verify(operatorSnapshotResult1).cancel();
 		verify(operatorSnapshotResult2).cancel();
-
-		verify(streamStateHandle1).discardState();
-		verify(streamStateHandle2).discardState();
-		verify(streamStateHandle3).discardState();
 	}
 
 	/**
@@ -384,12 +369,12 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 		streamTask.setEnvironment(mockEnvironment);
 
-		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-
-		// mock the new state handles / futures
+		// mock the operators
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
+		// mock the new state operator snapshots
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
@@ -403,15 +388,6 @@ public class StreamTaskTest extends TestLogger {
 		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
 		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
 
-		// mock the legacy state snapshot
-		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
-		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
-		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
-
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
-
 		OperatorID operatorID1 = new OperatorID();
 		OperatorID operatorID2 = new OperatorID();
 		OperatorID operatorID3 = new OperatorID();
@@ -438,10 +414,6 @@ public class StreamTaskTest extends TestLogger {
 		verify(operatorSnapshotResult1).cancel();
 		verify(operatorSnapshotResult2).cancel();
 		verify(operatorSnapshotResult3).cancel();
-
-		verify(streamStateHandle1).discardState();
-		verify(streamStateHandle2).discardState();
-		verify(streamStateHandle3).discardState();
 	}
 
 	/**
@@ -481,7 +453,7 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 		streamTask.setEnvironment(mockEnvironment);
 
-		StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator = mock(StreamOperator.class);
 
 		KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
 		KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
@@ -581,7 +553,6 @@ public class StreamTaskTest extends TestLogger {
 
 		whenNew(OperatorSubtaskState.class).
 			withArguments(
-				any(StreamStateHandle.class),
 				anyCollectionOf(OperatorStateHandle.class),
 				anyCollectionOf(OperatorStateHandle.class),
 				anyCollectionOf(KeyedStateHandle.class),
@@ -593,11 +564,10 @@ public class StreamTaskTest extends TestLogger {
 				completeSubtask.await();
 				Object[] arguments = invocation.getArguments();
 				return new OperatorSubtaskState(
-					(StreamStateHandle) arguments[0],
+					(OperatorStateHandle) arguments[0],
 					(OperatorStateHandle) arguments[1],
-					(OperatorStateHandle) arguments[2],
-					(KeyedStateHandle) arguments[3],
-					(KeyedStateHandle) arguments[4]
+					(KeyedStateHandle) arguments[2],
+					(KeyedStateHandle) arguments[3]
 				);
 			}
 		});
@@ -606,7 +576,7 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 		streamTask.setEnvironment(mockEnvironment);
 
-		final StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		final StreamOperator<?> streamOperator = mock(StreamOperator.class);
 		final OperatorID operatorID = new OperatorID();
 		when(streamOperator.getOperatorID()).thenReturn(operatorID);
 
@@ -717,7 +687,7 @@ public class StreamTaskTest extends TestLogger {
 
 		// mock the operators
 		StreamOperator<?> statelessOperator =
-				mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+				mock(StreamOperator.class);
 
 		final OperatorID operatorID = new OperatorID();
 		when(statelessOperator.getOperatorID()).thenReturn(operatorID);

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 720346a..9156f34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -25,11 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -41,12 +36,10 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -54,7 +47,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
@@ -72,7 +64,6 @@ import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.FileInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -308,36 +299,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		setupCalled = true;
 	}
 
-	public void initializeStateFromLegacyCheckpoint(String checkpointFilename) throws Exception {
-
-		FileInputStream fin = new FileInputStream(checkpointFilename);
-		StreamTaskState state = MigrationInstantiationUtil.deserializeObject(fin, ClassLoader.getSystemClassLoader());
-		fin.close();
-
-		if (!setupCalled) {
-			setup();
-		}
-
-		StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state);
-
-		List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>();
-		if (state.getKvStates() != null) {
-			KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState(
-					state.getKvStates(),
-					environment.getTaskInfo().getIndexOfThisSubtask(),
-					0);
-			keyGroupStatesList.add(keyedStateHandle);
-		}
-
-		// finally calling the initializeState() with the legacy operatorStateHandles
-		initializeState(new OperatorStateHandles(0,
-				stateHandle,
-				keyGroupStatesList,
-				Collections.<KeyedStateHandle>emptyList(),
-				Collections.<OperatorStateHandle>emptyList(),
-				Collections.<OperatorStateHandle>emptyList()));
-	}
-
 	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorSubtaskState)}.
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
@@ -397,7 +358,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 					numSubtasks).get(subtaskIndex);
 
 			OperatorSubtaskState massagedOperatorStateHandles = new OperatorSubtaskState(
-				operatorStateHandles.getLegacyOperatorState(),
 				nullToEmptyCollection(localManagedOperatorState),
 				nullToEmptyCollection(localRawOperatorState),
 				nullToEmptyCollection(localManagedKeyGroupState),
@@ -473,7 +433,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 
 		return new OperatorStateHandles(
 			0,
-			null,
 			mergedManagedKeyedState,
 			mergedRawKeyedState,
 			mergedManagedOperatorState,
@@ -497,8 +456,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	 */
 	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
-		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op");
-
 		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
 			checkpointId,
 			timestamp,
@@ -510,21 +467,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-		// also snapshot legacy state, if any
-		StreamStateHandle legacyStateHandle = null;
-
-		if (operator instanceof StreamCheckpointedOperator) {
-
-			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-					streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-				((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
-				legacyStateHandle = outStream.closeAndGetHandle();
-		}
-
 		return new OperatorStateHandles(
 			0,
-			legacyStateHandle,
 			keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
 			keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
 			opManaged != null ? Collections.singletonList(opManaged) : null,
@@ -532,24 +476,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	}
 
 	/**
-	 * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if
-	 * the operator implements this interface.
-	 */
-	@Deprecated
-	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
-
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
-				new JobID(),
-				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-		if (operator instanceof StreamCheckpointedOperator) {
-			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
-			return outStream.closeAndGetHandle();
-		} else {
-			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
-		}
-	}
-
-	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}.
 	 */
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
@@ -557,22 +483,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	}
 
 	/**
-	 * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
-	 * the operator implements this interface.
-	 */
-	@Deprecated
-	@SuppressWarnings("deprecation")
-	public void restore(StreamStateHandle snapshot) throws Exception {
-		if (operator instanceof StreamCheckpointedOperator) {
-			try (FSDataInputStream in = snapshot.openInputStream()) {
-				((StreamCheckpointedOperator) operator).restoreState(in);
-			}
-		} else {
-			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
-		}
-	}
-
-	/**
 	 * Calls close and dispose on the operator.
 	 */
 	public void close() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 0d42d9f..c2ec63a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,33 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Migration;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyInt;
@@ -142,61 +132,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 	}
 
-	/**
-	 *
-	 */
-	@Override
-	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
-		// simply use an in-memory handle
-		MemoryStateBackend backend = new MemoryStateBackend();
-
-		CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-				streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-		if (operator instanceof StreamCheckpointedOperator) {
-			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
-		}
-
-		if (keyedStateBackend != null) {
-			RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
-					checkpointId,
-					timestamp,
-					streamFactory,
-					CheckpointOptions.forFullCheckpoint());
-			if (!keyedSnapshotRunnable.isDone()) {
-				Thread runner = new Thread(keyedSnapshotRunnable);
-				runner.start();
-			}
-			outStream.write(1);
-			ObjectOutputStream oos = new ObjectOutputStream(outStream);
-			oos.writeObject(keyedSnapshotRunnable.get());
-			oos.flush();
-		} else {
-			outStream.write(0);
-		}
-		return outStream.closeAndGetHandle();
-	}
-
-	/**
-	 *
-	 */
-	@Override
-	public void restore(StreamStateHandle snapshot) throws Exception {
-		try (FSDataInputStream inStream = snapshot.openInputStream()) {
-
-			if (operator instanceof StreamCheckpointedOperator) {
-				((StreamCheckpointedOperator) operator).restoreState(inStream);
-			}
-
-			byte keyedStatePresent = (byte) inStream.read();
-			if (keyedStatePresent == 1) {
-				ObjectInputStream ois = new ObjectInputStream(inStream);
-				this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject());
-			}
-		}
-	}
-
 	private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) {
 		for (KeyedStateHandle handle : allKeyGroupsHandles) {
 			if (handle instanceof Migration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 7e32723..33f32e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 import java.io.DataInputStream;
@@ -53,7 +52,8 @@ public class OperatorSnapshotUtil {
 
 			dos.writeInt(state.getOperatorChainIndex());
 
-			SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+			// still required for compatibility
+			SavepointV1Serializer.serializeStreamStateHandle(null, dos);
 
 			Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
 			if (rawOperatorState != null) {
@@ -108,7 +108,8 @@ public class OperatorSnapshotUtil {
 		try (DataInputStream dis = new DataInputStream(in)) {
 			int index = dis.readInt();
 
-			StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+			// still required for compatibility to consume the bytes.
+			SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
 			List<OperatorStateHandle> rawOperatorState = null;
 			int numRawOperatorStates = dis.readInt();
@@ -154,7 +155,12 @@ public class OperatorSnapshotUtil {
 				}
 			}
 
-			return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+			return new OperatorStateHandles(
+				index,
+				managedKeyedState,
+				rawKeyedState,
+				managedOperatorState,
+				rawOperatorState);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
index f723b34..1c95a04 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
@@ -29,22 +29,16 @@ public class MigrationTestUtil {
 	/**
 	 * Restore from a snapshot taken with an older Flink version.
 	 *
-	 * @param testHarness the test harness to restore the snapshot to.
-	 * @param snapshotPath the absolute path to the snapshot.
+	 * @param testHarness          the test harness to restore the snapshot to.
+	 * @param snapshotPath         the absolute path to the snapshot.
 	 * @param snapshotFlinkVersion the Flink version of the snapshot.
-	 *
 	 * @throws Exception
 	 */
 	public static void restoreFromSnapshot(
-			AbstractStreamOperatorTestHarness<?> testHarness,
-			String snapshotPath,
-			MigrationVersion snapshotFlinkVersion) throws Exception {
+		AbstractStreamOperatorTestHarness<?> testHarness,
+		String snapshotPath,
+		MigrationVersion snapshotFlinkVersion) throws Exception {
 
-		if (snapshotFlinkVersion == MigrationVersion.v1_1) {
-			// Flink 1.1 snapshots should be read using the legacy restore method
-			testHarness.initializeStateFromLegacyCheckpoint(snapshotPath);
-		} else {
-			testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
-		}
+		testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index 104400f..35a56d7 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -25,15 +25,15 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingAlignedProcessingTimeWindows, SlidingEventTimeWindows, TumblingAlignedProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, WindowOperator}
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 /**
   * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut
@@ -85,59 +85,6 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
   }
 
-  /**
-    * These tests ensure that the fast aligned time windows operator is used if the
-    * conditions are right.
-    */
-  @Test
-  def testReduceAlignedTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-    
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-      .reduce(new DummyReducer())
-
-    val transform1 = window1.javaStream.getTransformation
-        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-    
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
-  }
-
-  /**
-    * These tests ensure that the fast aligned time windows operator is used if the
-    * conditions are right.
-    */
-  @Test
-  def testApplyAlignedTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val window1 = source
-      .keyBy(0)
-      .window(TumblingAlignedProcessingTimeWindows.of(Time.minutes(1)))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   key: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform1 = window1.javaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
-  }
-
   @Test
   def testReduceEventTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index cad6693..99fb6ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -975,7 +974,7 @@ public class RescalingITCase extends TestLogger {
 		}
 	}
 
-	private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> {
+	private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction {
 
 		private static final long serialVersionUID = -359715965103593462L;
 		private static final int NUM_PARTITIONS = 7;
@@ -1030,10 +1029,5 @@ public class RescalingITCase extends TestLogger {
 				checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 			}
 		}
-
-		@Override
-		public void restoreState(Integer state) throws Exception {
-			counterPartitions.add(state);
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index cc23545..1b7dafa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,7 +53,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -351,10 +351,6 @@ public class SavepointITCase extends TestLogger {
 					OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
 
 					assertNotNull(subtaskState);
-
-					errMsg = "Initial operator state mismatch.";
-					assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
-						tdd.getTaskStateHandles().getSubtaskStateByOperatorID(operatorState.getOperatorID()).getLegacyOperatorState());
 				}
 			}
 
@@ -377,17 +373,18 @@ public class SavepointITCase extends TestLogger {
 			assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
 
 			// - Verification START -------------------------------------------
-
 			// The checkpoint files
 			List<File> checkpointFiles = new ArrayList<>();
 
 			for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
 				for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					StreamStateHandle streamTaskState = subtaskState.getLegacyOperatorState();
+					Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState();
 
-					if (streamTaskState != null) {
-						FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState;
-						checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+					if (streamTaskState != null && !streamTaskState.isEmpty()) {
+						for (OperatorStateHandle osh : streamTaskState) {
+							FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
+							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+						}
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 21be7ba..eccc7e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +40,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -64,6 +66,11 @@ import static junit.framework.Assert.fail;
  */
 public class SavepointMigrationTestBase extends TestBaseUtils {
 
+	@BeforeClass
+	public static void before() {
+		SavepointSerializers.setFailWhenLegacyStateDetected(false);
+	}
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 


Mime
View raw message