flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [11/11] flink git commit: [FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
Date Thu, 24 Aug 2017 18:22:42 GMT
[FLINK-7461] Remove Backwards compatibility with <= Flink 1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6642768a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6642768a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6642768a

Branch: refs/heads/master
Commit: 6642768ad8f8c5d1856742a6d148f7724c20666c
Parents: 5456cf9
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Thu Aug 24 17:28:14 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Thu Aug 24 20:17:08 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |   22 +-
 .../connectors/fs/bucketing/BucketingSink.java  |   23 +-
 .../fs/bucketing/RollingSinkMigrationTest.java  |  213 ----
 .../RollingToBucketingMigrationTest.java        |  174 ---
 .../kafka/FlinkKafkaConsumerBase.java           |   20 +-
 .../FlinkKafkaConsumerBaseMigrationTest.java    |    4 +-
 .../kinesis/FlinkKinesisConsumer.java           |   24 +-
 .../FlinkKinesisConsumerMigrationTest.java      |  149 ---
 .../kinesis/FlinkKinesisConsumerTest.java       |   32 -
 .../ExactlyOnceValidatingConsumerThread.java    |   20 +-
 .../state/RocksDBKeyedStateBackend.java         |  194 +--
 .../streaming/state/RocksDBStateBackend.java    |   89 --
 .../state/RocksDBAsyncSnapshotTest.java         |   17 +-
 .../util/MigrationInstantiationUtil.java        |   96 --
 .../flink/migration/util/SerializedValue.java   |   98 --
 .../ContinuousFileProcessingMigrationTest.java  |    1 -
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  100 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  114 --
 .../flink/cep/nfa/compiler/NFACompiler.java     |  114 --
 .../AbstractKeyedCEPPatternOperator.java        |  325 +----
 .../cep/operator/CEPMigration11to13Test.java    |  346 ------
 .../flink/cep/operator/CEPMigrationTest.java    |    2 +-
 .../MigrationNamespaceSerializerProxy.java      |  132 ---
 .../apache/flink/migration/MigrationUtil.java   |   38 -
 .../api/common/state/ListStateDescriptor.java   |  113 --
 .../runtime/checkpoint/KeyGroupState.java       |   87 --
 .../runtime/checkpoint/SubtaskState.java        |  108 --
 .../migration/runtime/checkpoint/TaskState.java |  164 ---
 .../checkpoint/savepoint/SavepointV0.java       |  115 --
 .../savepoint/SavepointV0Serializer.java        |  425 -------
 .../runtime/state/AbstractCloseableHandle.java  |  131 --
 .../runtime/state/AbstractStateBackend.java     |   75 --
 .../runtime/state/KvStateSnapshot.java          |   32 -
 .../migration/runtime/state/StateHandle.java    |   41 -
 .../migration/runtime/state/StateObject.java    |   58 -
 .../runtime/state/StreamStateHandle.java        |   40 -
 .../filesystem/AbstractFileStateHandle.java     |  101 --
 .../filesystem/AbstractFsStateSnapshot.java     |  118 --
 .../filesystem/FileSerializableStateHandle.java |   76 --
 .../state/filesystem/FileStreamStateHandle.java |   87 --
 .../state/filesystem/FsFoldingState.java        |   44 -
 .../runtime/state/filesystem/FsListState.java   |   46 -
 .../state/filesystem/FsReducingState.java       |   44 -
 .../state/filesystem/FsStateBackend.java        |   54 -
 .../runtime/state/filesystem/FsValueState.java  |   44 -
 .../state/memory/AbstractMemStateSnapshot.java  |  138 ---
 .../AbstractMigrationRestoreStrategy.java       |  119 --
 .../state/memory/ByteStreamStateHandle.java     |   89 --
 .../runtime/state/memory/MemFoldingState.java   |   42 -
 .../runtime/state/memory/MemListState.java      |   45 -
 .../runtime/state/memory/MemReducingState.java  |   48 -
 .../runtime/state/memory/MemValueState.java     |   48 -
 .../state/memory/MigrationRestoreSnapshot.java  |   35 -
 .../state/memory/SerializedStateHandle.java     |   93 --
 .../state/MigrationKeyGroupStateHandle.java     |   46 -
 .../state/MigrationStreamStateHandle.java       |   67 --
 .../runtime/tasks/StreamTaskState.java          |   88 --
 .../runtime/tasks/StreamTaskStateList.java      |  100 --
 .../flink/runtime/checkpoint/OperatorState.java |   11 +-
 .../checkpoint/OperatorSubtaskState.java        |   70 +-
 .../checkpoint/StateAssignmentOperation.java    |   47 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |   39 +-
 .../flink/runtime/checkpoint/TaskState.java     |   10 -
 .../savepoint/SavepointSerializers.java         |   18 +-
 .../checkpoint/savepoint/SavepointStore.java    |    1 +
 .../savepoint/SavepointV1Serializer.java        |   38 +-
 .../checkpoint/savepoint/SavepointV2.java       |    5 -
 .../savepoint/SavepointV2Serializer.java        |   25 +-
 .../runtime/state/VoidNamespaceSerializer.java  |    8 -
 .../state/heap/HeapKeyedStateBackend.java       |   63 +-
 .../CheckpointCoordinatorFailureTest.java       |    5 -
 .../checkpoint/CheckpointCoordinatorTest.java   |  178 +--
 .../checkpoint/CheckpointStateRestoreTest.java  |   16 +-
 .../savepoint/CheckpointTestUtils.java          |   17 -
 .../savepoint/MigrationV0ToV1Test.java          |  251 ----
 .../jobmanager/JobManagerHARecoveryTest.java    |   32 +-
 .../messages/CheckpointMessagesTest.java        |    1 -
 .../api/graph/StreamGraphHasherV1.java          |  282 -----
 .../MultiplexingStreamRecordSerializer.java     |  293 -----
 .../streamrecord/StreamRecordSerializer.java    |  208 ----
 .../streaming/api/checkpoint/Checkpointed.java  |   80 --
 .../checkpoint/CheckpointedAsynchronously.java  |   61 -
 .../api/checkpoint/CheckpointedRestoring.java   |   43 -
 .../datastream/LegacyWindowOperatorType.java    |   63 -
 .../api/datastream/WindowedStream.java          |  197 +---
 .../ContinuousFileMonitoringFunction.java       |   11 +-
 .../source/ContinuousFileReaderOperator.java    |   92 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   15 +-
 .../api/operators/AbstractStreamOperator.java   |   67 --
 .../operators/AbstractUdfStreamOperator.java    |   84 +-
 .../CheckpointedRestoringOperator.java          |   50 -
 .../operators/StreamCheckpointedOperator.java   |   43 -
 .../streaming/api/operators/StreamOperator.java |   16 -
 .../TumblingAlignedProcessingTimeWindows.java   |   68 --
 ...ractAlignedProcessingTimeWindowOperator.java |  331 ------
 ...ccumulatingProcessingTimeWindowOperator.java |   64 -
 ...AggregatingProcessingTimeWindowOperator.java |   58 -
 .../operators/windowing/WindowOperator.java     |  314 -----
 .../streamrecord/StreamElementSerializer.java   |    4 -
 .../runtime/tasks/OperatorStateHandles.java     |    9 -
 .../streaming/runtime/tasks/StreamTask.java     |   39 +-
 .../AbstractUdfStreamOperatorLifecycleTest.java |   11 +-
 .../FoldApplyProcessWindowFunctionTest.java     |  332 ------
 .../operators/FoldApplyWindowFunctionTest.java  |  152 ---
 .../StreamingJobGraphGeneratorNodeHashTest.java |    3 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 1116 ------------------
 ...AlignedProcessingTimeWindowOperatorTest.java |  863 --------------
 .../windowing/AllWindowTranslationTest.java     |   26 -
 .../windowing/TimeWindowTranslationTest.java    |  101 --
 .../windowing/WindowOperatorMigrationTest.java  |  216 +---
 .../windowing/WindowTranslationTest.java        |   27 -
 .../tasks/InterruptSensitiveRestoreTest.java    |   71 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  124 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   58 +-
 .../util/AbstractStreamOperatorTestHarness.java |   90 --
 .../KeyedOneInputStreamOperatorTestHarness.java |   65 -
 .../streaming/util/OperatorSnapshotUtil.java    |   14 +-
 .../util/migration/MigrationTestUtil.java       |   18 +-
 .../api/scala/TimeWindowTranslationTest.scala   |   59 +-
 .../test/checkpointing/RescalingITCase.java     |    8 +-
 .../test/checkpointing/SavepointITCase.java     |   17 +-
 .../utils/SavepointMigrationTestBase.java       |    7 +
 ...atefulJobSavepointFrom11MigrationITCase.java |  562 ---------
 ...atefulJobSavepointFrom12MigrationITCase.java |  106 +-
 .../jar/LegacyCheckpointedStreamingProgram.java |  143 ---
 .../AbstractOperatorRestoreTestBase.java        |    6 +
 126 files changed, 318 insertions(+), 12512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 3d3ea05..e5758e8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
@@ -132,7 +131,7 @@ import java.util.UUID;
 @Deprecated
 public class RollingSink<T> extends RichSinkFunction<T>
 		implements InputTypeConfigurable, CheckpointedFunction,
-					CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
+					CheckpointListener {
 
 	private static final long serialVersionUID = 1L;
 
@@ -759,25 +758,6 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Backwards compatibility with Flink 1.1
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void restoreState(BucketState state) throws Exception {
-		LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
-
-		try {
-			initFileSystem();
-		} catch (IOException e) {
-			LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
-			throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
-		}
-
-		handleRestoredBucketState(state);
-	}
-
-	// --------------------------------------------------------------------------------------------
 	//  Setters for User configuration values
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 70168b5..cc924a4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.fs.Clock;
@@ -154,8 +153,7 @@ import java.util.UUID;
  */
 public class BucketingSink<T>
 		extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
-					CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
+		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
 
 	private static final long serialVersionUID = 1L;
 
@@ -873,25 +871,6 @@ public class BucketingSink<T>
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Backwards compatibility with Flink 1.1
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void restoreState(RollingSink.BucketState state) throws Exception {
-		LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
-
-		try {
-			initFileSystem();
-		} catch (IOException e) {
-			LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
-			throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
-		}
-
-		handleRestoredRollingSinkState(state);
-	}
-
-	// --------------------------------------------------------------------------------------------
 	//  Setters for User configuration values
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
deleted file mode 100644
index e041379..0000000
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.fs.RollingSink;
-import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.OperatingSystem;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests the migration from 1.1 snapshots.
- */
-@Deprecated
-public class RollingSinkMigrationTest {
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static final String PART_PREFIX = "part";
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-	private static final String VALID_LENGTH_SUFFIX = ".valid";
-
-	@BeforeClass
-	public static void verifyOS() {
-		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
-	}
-
-	@Test
-	public void testMigration() throws Exception {
-
-		/*
-		* Code ran to get the snapshot:
-		*
-		* final File outDir = tempFolder.newFolder();
-
-		RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
-			.setWriter(new StringWriter<String>())
-			.setBatchSize(5)
-			.setPartPrefix(PART_PREFIX)
-			.setInProgressPrefix("")
-			.setPendingPrefix("")
-			.setValidLengthPrefix("")
-			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
-			.setPendingSuffix(PENDING_SUFFIX)
-			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
-			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
-
-		testHarness1.setup();
-		testHarness1.open();
-
-		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		testHarness1.processElement(new StreamRecord<>("test2", 0L));
-
-		checkFs(outDir, 1, 1, 0, 0);
-
-		testHarness1.processElement(new StreamRecord<>("test3", 0L));
-		testHarness1.processElement(new StreamRecord<>("test4", 0L));
-		testHarness1.processElement(new StreamRecord<>("test5", 0L));
-
-		checkFs(outDir, 1, 4, 0, 0);
-
-		StreamTaskState taskState = testHarness1.snapshot(0, 0);
-		testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
-		testHarness1.close();
-		* */
-
-		final File outDir = tempFolder.newFolder();
-
-		RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
-			.setWriter(new StringWriter<String>())
-			.setBatchSize(5)
-			.setPartPrefix(PART_PREFIX)
-			.setInProgressPrefix("")
-			.setPendingPrefix("")
-			.setValidLengthPrefix("")
-			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
-			.setPendingSuffix(PENDING_SUFFIX)
-			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(sink), 10, 1, 0);
-		testHarness1.setup();
-		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
-		testHarness1.open();
-
-		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		testHarness1.processElement(new StreamRecord<>("test2", 0L));
-
-		checkFs(outDir, 1, 1, 0, 0);
-
-		testHarness1.close();
-	}
-
-	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
-		int inProg = 0;
-		int pend = 0;
-		int compl = 0;
-		int val = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-			String path = file.getPath();
-			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-				inProg++;
-			} else if (path.endsWith(PENDING_SUFFIX)) {
-				pend++;
-			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-				val++;
-			} else if (path.contains(PART_PREFIX)) {
-				compl++;
-			}
-		}
-
-		Assert.assertEquals(inprogress, inProg);
-		Assert.assertEquals(pending, pend);
-		Assert.assertEquals(completed, compl);
-		Assert.assertEquals(valid, val);
-	}
-
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		return resource.getFile();
-	}
-
-	static class ValidatingRollingSink<T> extends RollingSink<T> {
-
-		private static final long serialVersionUID = -4263974081712009141L;
-
-		ValidatingRollingSink(String basePath) {
-			super(basePath);
-		}
-
-		@Override
-		public void restoreState(BucketState state) throws Exception {
-
-			/**
-			 * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
-			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
-			 * 					validLength=6
-			 * pendingForNextCheckpoint=[]
-			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
-			 * */
-
-			String current = state.currentFile;
-			long validLength = state.currentFileValidLength;
-
-			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
-			Assert.assertEquals(6, validLength);
-
-			List<String> pendingFiles = state.pendingFiles;
-			Assert.assertTrue(pendingFiles.isEmpty());
-
-			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
-			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
-
-			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
-				long checkpoint = entry.getKey();
-				List<String> files = entry.getValue();
-
-				Assert.assertEquals(0L, checkpoint);
-				Assert.assertEquals(4, files.size());
-
-				for (int i = 0; i < 4; i++) {
-					Assert.assertEquals(
-						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
-						files.get(i));
-				}
-			}
-			super.restoreState(state);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
deleted file mode 100644
index 8a8dbd6..0000000
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.fs.RollingSink;
-import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.OperatingSystem;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests the migration from {@link RollingSink} to {@link BucketingSink}.
- */
-public class RollingToBucketingMigrationTest {
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static final String PART_PREFIX = "part";
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-	private static final String VALID_LENGTH_SUFFIX = ".valid";
-
-	@BeforeClass
-	public static void verifyOS() {
-		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
-	}
-
-	@Test
-	public void testMigration() throws Exception {
-		final File outDir = tempFolder.newFolder();
-
-		BucketingSink<String> sink = new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
-			.setWriter(new StringWriter<String>())
-			.setBatchSize(5)
-			.setPartPrefix(PART_PREFIX)
-			.setInProgressPrefix("")
-			.setPendingPrefix("")
-			.setValidLengthPrefix("")
-			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
-			.setPendingSuffix(PENDING_SUFFIX)
-			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(sink), 10, 1, 0);
-		testHarness1.setup();
-		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
-		testHarness1.open();
-
-		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		testHarness1.processElement(new StreamRecord<>("test2", 0L));
-
-		checkFs(outDir, 1, 1, 0, 0);
-
-		testHarness1.close();
-	}
-
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		return resource.getFile();
-	}
-
-	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
-		int inProg = 0;
-		int pend = 0;
-		int compl = 0;
-		int val = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-			String path = file.getPath();
-			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-				inProg++;
-			} else if (path.endsWith(PENDING_SUFFIX)) {
-				pend++;
-			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-				val++;
-			} else if (path.contains(PART_PREFIX)) {
-				compl++;
-			}
-		}
-
-		Assert.assertEquals(inprogress, inProg);
-		Assert.assertEquals(pending, pend);
-		Assert.assertEquals(completed, compl);
-		Assert.assertEquals(valid, val);
-	}
-
-	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
-
-		private static final long serialVersionUID = -4263974081712009141L;
-
-		ValidatingBucketingSink(String basePath) {
-			super(basePath);
-		}
-
-		@Override
-		public void restoreState(RollingSink.BucketState state) throws Exception {
-
-			/**
-			 * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
-			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
-			 * 					validLength=6
-			 * pendingForNextCheckpoint=[]
-			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
-			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
-			 * */
-
-			String current = state.currentFile;
-			long validLength = state.currentFileValidLength;
-
-			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
-			Assert.assertEquals(6, validLength);
-
-			List<String> pendingFiles = state.pendingFiles;
-			Assert.assertTrue(pendingFiles.isEmpty());
-
-			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
-			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
-
-			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
-				long checkpoint = entry.getKey();
-				List<String> files = entry.getValue();
-
-				Assert.assertEquals(0L, checkpoint);
-				Assert.assertEquals(4, files.size());
-
-				for (int i = 0; i < 4; i++) {
-					Assert.assertEquals(
-						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
-						files.get(i));
-				}
-			}
-
-			super.restoreState(state);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f3c9e5e..3088b15 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -78,8 +77,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
 		CheckpointListener,
 		ResultTypeQueryable<T>,
-		CheckpointedFunction,
-		CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
+		CheckpointedFunction {
 
 	private static final long serialVersionUID = -6272159445203409112L;
 
@@ -767,22 +765,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public final void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
-		LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
-
-		restoredFromOldState = true;
-
-		if (restoredOffsets.size() > 0 && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
-			throw new IllegalArgumentException(
-				"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.1.x.");
-		}
-
-		restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
-		restoredState.putAll(restoredOffsets);
-	}
-
-	@Override
 	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index e3f337e..84f0e38 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
-		return Arrays.asList(MigrationVersion.v1_1, MigrationVersion.v1_2, MigrationVersion.v1_3);
+		return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3);
 	}
 
 	public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion testMigrateVersion) {
@@ -322,7 +322,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	 */
 	@Test
 	public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
-		assumeTrue(testMigrateVersion == MigrationVersion.v1_1 || testMigrateVersion == MigrationVersion.v1_2);
+		assumeTrue(testMigrateVersion == MigrationVersion.v1_3 || testMigrateVersion == MigrationVersion.v1_2);
 
 		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index d127f2b..5689229 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -28,13 +28,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -72,8 +70,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> implements
 		ResultTypeQueryable<T>,
-		CheckpointedFunction,
-		CheckpointedRestoring<HashMap<KinesisStreamShard, SequenceNumber>> {
+		CheckpointedFunction {
 
 	private static final long serialVersionUID = 4724006128720664870L;
 
@@ -352,7 +349,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
-						lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
+						lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp());
 				}
 
 				for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
@@ -362,23 +359,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		}
 	}
 
-	@Override
-	public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
-		LOG.info("Subtask {} restoring offsets from an older Flink version: {}",
-			getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
-
-		if (restoredState.isEmpty()) {
-			sequenceNumsToRestore = null;
-		} else {
-			sequenceNumsToRestore = new HashMap<>();
-			for (Map.Entry<KinesisStreamShard, SequenceNumber> stateEntry : restoredState.entrySet()) {
-				sequenceNumsToRestore.put(
-						KinesisStreamShard.convertToStreamShardMetadata(stateEntry.getKey()),
-						stateEntry.getValue());
-			}
-		}
-	}
-
 	/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
 	protected KinesisDataFetcher<T> createFetcher(
 			List<String> streams,

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
deleted file mode 100644
index af84420..0000000
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-
-import com.amazonaws.services.kinesis.model.Shard;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for checking whether {@link FlinkKinesisConsumer} can restore from snapshots that were
- * done using the Flink 1.1 {@code FlinkKinesisConsumer}.
- */
-public class FlinkKinesisConsumerMigrationTest {
-
-	@Test
-	public void testRestoreFromFlink11WithEmptyState() throws Exception {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig);
-
-		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);
-
-		final AbstractStreamOperatorTestHarness<String> testHarness =
-			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
-
-		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		testHarness.setup();
-		// restore state from binary snapshot file using legacy method
-		testHarness.initializeStateFromLegacyCheckpoint(
-			getResourceFilename("kinesis-consumer-migration-test-flink1.1-empty-snapshot"));
-		testHarness.open();
-
-		// assert that no state was restored
-		assertEquals(null, consumerFunction.getRestoredState());
-
-		consumerOperator.close();
-		consumerOperator.cancel();
-	}
-
-	@Test
-	public void testRestoreFromFlink11() throws Exception {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig);
-
-		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
-			new StreamSource<>(consumerFunction);
-
-		final AbstractStreamOperatorTestHarness<String> testHarness =
-			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
-
-		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		testHarness.setup();
-		// restore state from binary snapshot file using legacy method
-		testHarness.initializeStateFromLegacyCheckpoint(
-			getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot"));
-		testHarness.open();
-
-		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
-		final HashMap<StreamShardMetadata, SequenceNumber> expectedState = new HashMap<>();
-		expectedState.put(KinesisStreamShard.convertToStreamShardMetadata(new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
-			new SequenceNumber("987654321"));
-
-		// assert that state is correctly restored from legacy checkpoint
-		assertNotEquals(null, consumerFunction.getRestoredState());
-		assertEquals(1, consumerFunction.getRestoredState().size());
-		assertEquals(expectedState, consumerFunction.getRestoredState());
-
-		consumerOperator.close();
-		consumerOperator.cancel();
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = FlinkKinesisConsumerMigrationTest.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		if (resource == null) {
-			throw new NullPointerException("Missing snapshot resource.");
-		}
-		return resource.getFile();
-	}
-
-	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("unchecked")
-		DummyFlinkKafkaConsumer(Properties properties) {
-			super("test", mock(KinesisDeserializationSchema.class), properties);
-		}
-
-		@Override
-		protected KinesisDataFetcher<T> createFetcher(
-				List<String> streams,
-				SourceFunction.SourceContext<T> sourceContext,
-				RuntimeContext runtimeContext,
-				Properties configProps,
-				KinesisDeserializationSchema<T> deserializationSchema) {
-			return mock(KinesisDataFetcher.class);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a26e758..69d30cd 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -712,38 +712,6 @@ public class FlinkKinesisConsumerTest {
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
-		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
-		HashMap<KinesisStreamShard, SequenceNumber> legacyFakeRestoredState = new HashMap<>();
-		for (Map.Entry<StreamShardHandle, SequenceNumber> kv : fakeRestoredState.entrySet()) {
-			legacyFakeRestoredState.put(new KinesisStreamShard(kv.getKey().getStreamName(), kv.getKey().getShard()), kv.getValue());
-		}
-
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<StreamShardHandle> shards = new ArrayList<>();
-		shards.addAll(fakeRestoredState.keySet());
-		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
-
-		// assume the given config is correct
-		PowerMockito.mockStatic(KinesisConfigUtil.class);
-		PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
-		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
-			"fakeStream", new Properties(), 10, 2);
-		consumer.restoreState(legacyFakeRestoredState);
-		consumer.open(new Configuration());
-		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
-		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
-			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
-					restoredShard.getKey(), restoredShard.getValue()));
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
 	public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
 
 		// ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
index 75356ef..1336652 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
@@ -29,11 +29,14 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -95,7 +98,7 @@ public class ExactlyOnceValidatingConsumerThread {
 		return new Thread(exactlyOnceValidationConsumer);
 	}
 
-	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, Checkpointed<BitSet> {
+	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, ListCheckpointed<BitSet> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
 
@@ -126,13 +129,18 @@ public class ExactlyOnceValidatingConsumerThread {
 		}
 
 		@Override
-		public BitSet snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return validator;
+		public List<BitSet> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(validator);
 		}
 
 		@Override
-		public void restoreState(BitSet state) throws Exception {
-			this.validator = state;
+		public void restoreState(List<BitSet> state) throws Exception {
+			// we expect either 1 or 0 elements
+			if (state.size() == 1) {
+				validator = state.get(0);
+			} else {
+				Preconditions.checkState(state.isEmpty());
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b7f386d..dd5b852 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -39,15 +38,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
-import org.apache.flink.migration.MigrationUtil;
-import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
@@ -57,7 +51,6 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
@@ -80,7 +73,6 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 
@@ -97,11 +89,9 @@ import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -110,7 +100,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -140,9 +129,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
 	public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
+	/** File suffix of sstable files. */
+	private static final String SST_FILE_SUFFIX = ".sst";
+
 	/** Bytes for the name of the column decriptor for the default column family. */
 	public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
+	/** String that identifies the operator that owns this backend. */
 	private final String operatorIdentifier;
 
 	/** The column family options from the options factory. */
@@ -206,8 +199,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** Unique ID of this backend. */
 	private UUID backendUID;
 
-	private static final String SST_FILE_SUFFIX = ".sst";
-
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
 		ClassLoader userCodeClassLoader,
@@ -311,10 +302,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return keyGroupPrefixBytes;
 	}
 
-	private boolean hasRegisteredState() {
-		return !kvStateInformation.isEmpty();
-	}
-
 	/**
 	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
 	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
@@ -359,7 +346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				throw new IOException("RocksDB closed.");
 			}
 
-			if (!hasRegisteredState()) {
+			if (kvStateInformation.isEmpty()) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
 						checkpointTimestamp + " . Returning null.");
@@ -404,7 +391,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			if (db != null) {
 
-				if (!hasRegisteredState()) {
+				if (kvStateInformation.isEmpty()) {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
 							" . Returning null.");
@@ -887,11 +874,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void takeSnapshot() throws Exception {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+			final long lastCompletedCheckpoint;
+
 			// use the last completed checkpoint as the comparison base.
 			synchronized (stateBackend.materializedSstFiles) {
-				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
 			}
 
+			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 				: stateBackend.kvStateInformation.entrySet()) {
@@ -929,7 +922,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					if (fileName.endsWith(SST_FILE_SUFFIX)) {
 						final boolean existsAlready =
-							baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
+							baseSstFiles != null && baseSstFiles.contains(stateHandleID);
 
 						if (existsAlready) {
 							// we introduce a placeholder state handle, that is replaced with the
@@ -982,7 +975,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			if (canceled) {
-				List<StateObject> statesToDiscard = new ArrayList<>();
+				Collection<StateObject> statesToDiscard =
+					new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
 
 				statesToDiscard.add(metaStateHandle);
 				statesToDiscard.addAll(miscFiles.values());
@@ -1012,9 +1006,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		try {
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
-			} else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
-				LOG.info("Converting RocksDB state from old savepoint.");
-				restoreOldSavepointKeyedState(restoreState);
 			} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
 				RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
 				restoreOperation.restore(restoreState);
@@ -1035,14 +1026,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				return;
 			}
 
-			Iterator<Long> materializedCheckpointIterator = materializedSstFiles.keySet().iterator();
-			while (materializedCheckpointIterator.hasNext()) {
-				long materializedCheckpointId = materializedCheckpointIterator.next();
-
-				if (materializedCheckpointId < completedCheckpointId) {
-					materializedCheckpointIterator.remove();
-				}
-			}
+			materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
 
 			lastCompletedCheckpointId = completedCheckpointId;
 		}
@@ -1067,10 +1051,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// we add the required descriptor for the default CF in last position.
 		columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
 
-		RocksDB db;
+		RocksDB dbRef;
 
 		try {
-			db = RocksDB.open(
+			dbRef = RocksDB.open(
 				Preconditions.checkNotNull(dbOptions),
 				Preconditions.checkNotNull(path),
 				columnFamilyDescriptors,
@@ -1083,7 +1067,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
 			"Not all requested column family handles have been created");
 
-		return db;
+		return dbRef;
 	}
 
 	/**
@@ -1117,12 +1101,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * Restores all key-groups data that is referenced by the passed state handles.
 		 *
 		 * @param keyedStateHandles List of all key groups state handles that shall be restored.
-		 * @throws IOException
-		 * @throws ClassNotFoundException
-		 * @throws RocksDBException
 		 */
 		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-			throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException {
+			throws IOException, StateMigrationException, RocksDBException {
 
 			rocksDBKeyedStateBackend.createDB();
 
@@ -1142,13 +1123,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		/**
 		 * Restore one key groups state handle.
-		 *
-		 * @throws IOException
-		 * @throws RocksDBException
-		 * @throws ClassNotFoundException
 		 */
 		private void restoreKeyGroupsInStateHandle()
-			throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException {
+			throws IOException, StateMigrationException, RocksDBException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1251,13 +1228,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				//not empty key-group?
 				if (0L != offset) {
 					currentStateHandleInStream.seek(offset);
-					boolean keyGroupHasMoreKeys = true;
 					try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
 						DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
 						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
 						int kvStateId = compressedKgInputView.readShort();
 						ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
 						//insert all k/v pairs into DB
+						boolean keyGroupHasMoreKeys = true;
 						while (keyGroupHasMoreKeys) {
 							byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
 							byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
@@ -1557,7 +1534,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
 
 			boolean hasExtraKeys = (restoreStateHandles.size() > 1 ||
-				!restoreStateHandles.iterator().next().getKeyGroupRange().equals(stateBackend.keyGroupRange));
+				!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange));
 
 			if (hasExtraKeys) {
 				stateBackend.createDB();
@@ -1611,16 +1588,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
 
 			Preconditions.checkState(
-				newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+				Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
 				"Incompatible state names. " +
 					"Was [" + restoredMetaInfo.getName() + "], " +
 					"registered with [" + newMetaInfo.getName() + "].");
 
-			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
-				&& !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+			if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
+				&& !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
 
 				Preconditions.checkState(
-					newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+					newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
 					"Incompatible state types. " +
 						"Was [" + restoredMetaInfo.getStateType() + "], " +
 						"registered with [" + newMetaInfo.getStateType() + "].");
@@ -1629,7 +1606,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// check compatibility results to determine if state migration is required
 			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
 				restoredMetaInfo.getNamespaceSerializer(),
-				MigrationNamespaceSerializerProxy.class,
+				null,
 				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
 				newMetaInfo.getNamespaceSerializer());
 
@@ -1639,12 +1616,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				restoredMetaInfo.getStateSerializerConfigSnapshot(),
 				newMetaInfo.getStateSerializer());
 
-			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
-				stateInfo.f1 = newMetaInfo;
-				return stateInfo.f0;
-			} else {
+			if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
 				// TODO state migration currently isn't possible.
 				throw new StateMigrationException("State migration isn't supported, yet.");
+			} else {
+				stateInfo.f1 = newMetaInfo;
+				return stateInfo.f0;
 			}
 		}
 
@@ -1719,8 +1696,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
-	                                                                 MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+	protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+		TypeSerializer<N> namespaceSerializer,
+		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
@@ -1963,105 +1942,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return instanceBasePath;
 	}
 
-	/**
-	 * For backwards compatibility, remove again later!
-	 *
-	 * @deprecated Internal method used for backwards compatibility.
-	 */
-	@Deprecated
-	private void restoreOldSavepointKeyedState(Collection<KeyedStateHandle> restoreState) throws Exception {
-		createDB();
-
-		Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
-
-		KeyedStateHandle keyedStateHandle = restoreState.iterator().next();
-		if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) {
-			throw new IllegalStateException("Unexpected state handle type, " +
-				"expected: " + MigrationKeyGroupStateHandle.class +
-				", but found: " + keyedStateHandle.getClass());
-		}
-
-		MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle;
-
-		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates;
-		try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) {
-			namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
-		}
-
-		Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
-		DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
-
-		// first get the column family mapping
-		int numColumns = inputView.readInt();
-		Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
-		for (int i = 0; i < numColumns; i++) {
-			byte mappingByte = inputView.readByte();
-
-			ObjectInputStream ooIn =
-				new InstantiationUtil.ClassLoaderObjectInputStream(
-					new DataInputViewStream(inputView), userCodeClassLoader);
-
-			StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
-
-			columnFamilyMapping.put(mappingByte, stateDescriptor);
-
-			// mimic a restored kv state meta info
-			restoredKvStateMetaInfos.put(
-				stateDescriptor.getName(),
-				new RegisteredKeyedBackendStateMetaInfo<>(
-					stateDescriptor.getType(),
-					stateDescriptor.getName(),
-					MigrationNamespaceSerializerProxy.INSTANCE,
-					stateDescriptor.getSerializer()).snapshot());
-
-			// this will fill in the k/v state information
-			getColumnFamily(stateDescriptor, MigrationNamespaceSerializerProxy.INSTANCE);
-		}
-
-		// try and read until EOF
-		try {
-			// the EOFException will get us out of this...
-			while (true) {
-				byte mappingByte = inputView.readByte();
-				ColumnFamilyHandle handle = getColumnFamily(
-					columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE);
-
-				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
-
-				ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
-
-				K reconstructedKey = keySerializer.deserialize(new DataInputViewStreamWrapper(bis));
-				int len = bis.getPosition();
-
-				int keyGroup = (byte) KeyGroupRangeAssignment.assignToKeyGroup(reconstructedKey, numberOfKeyGroups);
-
-				if (keyGroupPrefixBytes == 1) {
-					// copy and override one byte (42) between key and namespace
-					System.arraycopy(keyAndNamespace, 0, keyAndNamespace, 1, len);
-					keyAndNamespace[0] = (byte) keyGroup;
-				} else {
-					byte[] largerKey = new byte[1 + keyAndNamespace.length];
-
-					// write key-group
-					largerKey[0] = (byte) ((keyGroup >> 8) & 0xFF);
-					largerKey[1] = (byte) (keyGroup & 0xFF);
-
-					// write key
-					System.arraycopy(keyAndNamespace, 0, largerKey, 2, len);
-
-					//skip one byte (42), write namespace
-					System.arraycopy(keyAndNamespace, 1 + len, largerKey, 2 + len, keyAndNamespace.length - len - 1);
-					keyAndNamespace = largerKey;
-				}
-
-				byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
-				db.put(handle, keyAndNamespace, value);
-			}
-		} catch (EOFException e) {
-			// expected
-		}
-	}
-
 	@Override
 	public boolean supportsAsynchronousSnapshots() {
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
deleted file mode 100644
index 024d12e..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.migration.runtime.state.AbstractStateBackend;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.StateHandle;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * @deprecated Internal class used for backwards compatibility.
- */
-@Deprecated
-public class RocksDBStateBackend extends AbstractStateBackend {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base.
-	 */
-	public static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>> {
-		private static final long serialVersionUID = 1L;
-
-		public final StateHandle<DataInputView> stateHandle;
-		final long checkpointId;
-
-		/**
-		 * Creates a new snapshot from the given state parameters.
-		 */
-		private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
-			this.stateHandle = requireNonNull(stateHandle);
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public final void discardState() throws Exception {
-			stateHandle.discardState();
-		}
-
-		@Override
-		public final long getStateSize() throws Exception {
-			return stateHandle.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			stateHandle.close();
-		}
-	}
-
-	/**
-	 * This class exists to provide a good error message if a user attempts to restore from a semi async snapshot.
-	 *
-	 * <p>see FLINK-5468
-	 */
-	@Deprecated
-	public static class FinalSemiAsyncSnapshot {
-
-		static {
-			throwExceptionOnLoadingThisClass();
-		}
-
-		private static void throwExceptionOnLoadingThisClass() {
-			throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. "
-					+ "Unfortunately, this is not supported. Please create a new savepoint for the job using fully "
-					+ "async mode in Flink 1.1 and run migration again with the new savepoint.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c752e53..98208fd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -26,8 +26,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -50,7 +48,6 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -451,7 +448,7 @@ public class RocksDBAsyncSnapshotTest {
 
 	private static class AsyncCheckpointOperator
 		extends AbstractStreamOperator<String>
-		implements OneInputStreamOperator<String, String>, StreamCheckpointedOperator {
+		implements OneInputStreamOperator<String, String> {
 
 		@Override
 		public void open() throws Exception {
@@ -477,17 +474,5 @@ public class RocksDBAsyncSnapshotTest {
 
 			state.update(element.getValue());
 		}
-
-		@Override
-		public void snapshotState(
-				FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-			// do nothing so that we don't block
-		}
-
-		@Override
-		public void restoreState(FSDataInputStream in) throws Exception {
-			// do nothing so that we don't block
-		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
deleted file mode 100644
index 69e4e6d..0000000
--- a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.util;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-
-/**
- * Utility class to deserialize legacy classes for migration.
- */
-@PublicEvolving
-public final class MigrationInstantiationUtil {
-
-	public static class ClassLoaderObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
-
-		private static final String ARRAY_PREFIX = "[L";
-		private static final String FLINK_BASE_PACKAGE = "org.apache.flink.";
-		private static final String FLINK_MIGRATION_PACKAGE = "org.apache.flink.migration.";
-
-		public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
-			super(in, classLoader);
-		}
-
-		@Override
-		protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-			final String className = desc.getName();
-
-			// the flink package may be at position 0 (regular class) or position 2 (array)
-			final int flinkPackagePos;
-			if ((flinkPackagePos = className.indexOf(FLINK_BASE_PACKAGE)) == 0 ||
-					(flinkPackagePos == 2 && className.startsWith(ARRAY_PREFIX)))
-			{
-				final String modClassName = flinkPackagePos == 0 ?
-						FLINK_MIGRATION_PACKAGE + className.substring(FLINK_BASE_PACKAGE.length()) :
-						ARRAY_PREFIX + FLINK_MIGRATION_PACKAGE + className.substring(2 + FLINK_BASE_PACKAGE.length());
-
-				try {
-					return classLoader != null ?
-							Class.forName(modClassName, false, classLoader) :
-							Class.forName(modClassName);
-				}
-				catch (ClassNotFoundException ignored) {}
-			}
-
-			// either a non-Flink class, or not located in the migration package
-			return super.resolveClass(desc);
-		}
-	}
-
-	public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
-		return deserializeObject(new ByteArrayInputStream(bytes), cl);
-	}
-
-	@SuppressWarnings("unchecked")
-	public static <T> T deserializeObject(InputStream in, ClassLoader cl) throws IOException, ClassNotFoundException {
-		final ClassLoader old = Thread.currentThread().getContextClassLoader();
-		try (ObjectInputStream oois = new ClassLoaderObjectInputStream(in, cl)) {
-			Thread.currentThread().setContextClassLoader(cl);
-			return (T) oois.readObject();
-		} finally {
-			Thread.currentThread().setContextClassLoader(old);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private MigrationInstantiationUtil() {
-		throw new IllegalAccessError();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
deleted file mode 100644
index 6fa29d3..0000000
--- a/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.util;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * This class is used to transfer (via serialization) objects whose classes are not available
- * in the system class loader. When those objects are deserialized without access to their
- * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
- *
- * To work around that issue, the SerializedValue serialized data immediately into a byte array.
- * When send through RPC or another service that uses serialization, only the byte array is
- * transferred. The object is deserialized later (upon access) and requires the accessor to
- * provide the corresponding class loader.
- *
- * @param <T> The type of the value held.
- * @deprecated Only used internally when migrating from previous savepoint versions.
- */
-@Deprecated
-@PublicEvolving
-public class SerializedValue<T> implements java.io.Serializable {
-
-	private static final long serialVersionUID = -3564011643393683761L;
-
-	/** The serialized data */
-	private final byte[] serializedData;
-
-	private SerializedValue(byte[] serializedData) {
-		this.serializedData = serializedData;
-	}
-
-	public SerializedValue(T value) throws IOException {
-		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
-	}
-
-	@SuppressWarnings("unchecked")
-	public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
-		return serializedData == null ? null : (T) MigrationInstantiationUtil.deserializeObject(serializedData, loader);
-	}
-
-	/**
-	 * Returns the serialized value or <code>null</code> if no value is set.
-	 *
-	 * @return Serialized data.
-	 */
-	public byte[] getByteArray() {
-		return serializedData;
-	}
-
-	public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
-		return new SerializedValue<T>(serializedData);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return serializedData == null ? 0 : Arrays.hashCode(serializedData);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SerializedValue) {
-			SerializedValue<?> other = (SerializedValue<?>) obj;
-			return this.serializedData == null ? other.serializedData == null :
-					(other.serializedData != null && Arrays.equals(this.serializedData, other.serializedData));
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "SerializedValue";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 78c57ed..602ad3e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -76,7 +76,6 @@ public class ContinuousFileProcessingMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
 	public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
 		return Arrays.asList(
-			Tuple2.of(MigrationVersion.v1_1, 1482144479339L),
 			Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
 			Tuple2.of(MigrationVersion.v1_3, 1496532000000L));
 	}


Mime
View raw message