flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
Date Thu, 25 Oct 2018 16:45:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663989#comment-16663989 ] 

ASF GitHub Bot commented on FLINK-9808:
---------------------------------------

tzulitai closed pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..b8282fe238c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StateMigrationException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -238,7 +239,8 @@
 			} else if (compat.isCompatibleAfterMigration()) {
 				return CompatibilityResult.requiresMigration();
 			} else if (compat.isIncompatible()) {
-				throw new IllegalStateException("The new serializer is incompatible.");
+				throw new RuntimeException(
+					new StateMigrationException("The new serializer is incompatible, meaning that the new serializer can't be used even if state migration is performed."));
 			} else {
 				throw new IllegalStateException("Unidentifiable schema compatibility type. This is a bug, please file a JIRA.");
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
 
 			final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
 
-			@SuppressWarnings("unchecked")
-			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
+			// check whether new serializers are incompatible
+			TypeSerializerSnapshot<K> keySerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<K>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
 
-			// check compatibility to determine if state migration is required
-			CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-					broadcastStateKeySerializer);
-
-			CompatibilityResult<V> valueCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getValueSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-					broadcastStateValueSerializer);
-
-			if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
-				// new serializer is compatible; use it to replace the old serializer
-				broadcastState.setStateMetaInfo(
-						new RegisteredBroadcastStateBackendMetaInfo<>(
-								name,
-								OperatorStateHandle.Mode.BROADCAST,
-								broadcastStateKeySerializer,
-								broadcastStateValueSerializer));
-			} else {
-				// TODO state migration currently isn't possible.
-
-				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
-				// since the state has already been deserialized to objects and we can just continue with
-				// the new serializer; we're deliberately failing here for now to have equal functionality with
-				// the RocksDB backend to avoid confusion for users.
-
-				throw StateMigrationException.notSupported();
+			TypeSerializerSchemaCompatibility<K, ?> keyCompatibility =
+				keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+			if (keyCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new key serializer for broadcast state must not be incompatible.");
+			}
+
+			TypeSerializerSnapshot<V> valueSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<V>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+			TypeSerializerSchemaCompatibility<V, ?> valueCompatibility =
+				valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+			if (valueCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new value serializer for broadcast state must not be incompatible.");
 			}
+
+			// new serializer is compatible; use it to replace the old serializer
+			broadcastState.setStateMetaInfo(
+					new RegisteredBroadcastStateBackendMetaInfo<>(
+							name,
+							OperatorStateHandle.Mode.BROADCAST,
+							broadcastStateKeySerializer,
+							broadcastStateValueSerializer));
 		}
 
 		accessedBroadcastStatesByName.put(name, broadcastState);
@@ -606,27 +596,19 @@ public void addAll(List<S> values) {
 
 			// check compatibility to determine if state migration is required
 			TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
-			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					metaInfo.getPartitionStateSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-					newPartitionStateSerializer);
-
-			if (!stateCompatibility.isRequiresMigration()) {
-				// new serializer is compatible; use it to replace the old serializer
-				partitionableListState.setStateMetaInfo(
-					new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
-			} else {
-				// TODO state migration currently isn't possible.
-
-				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
-				// since the state has already been deserialized to objects and we can just continue with
-				// the new serializer; we're deliberately failing here for now to have equal functionality with
-				// the RocksDB backend to avoid confusion for users.
-
-				throw StateMigrationException.notSupported();
+
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+			TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
+				stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+			if (stateCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
 			}
+
+			partitionableListState.setStateMetaInfo(
+				new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
 		}
 
 		accessedStatesByName.put(name, partitionableListState);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index 7f95ed70326..d05f31a0c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -19,14 +19,10 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StateMigrationException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -151,79 +147,42 @@ public int hashCode() {
 		return result;
 	}
 
-	/**
-	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
-	 * This checks that the descriptor specifies identical names and state types, as well as
-	 * serializers that are compatible for the restored k/v state bytes.
-	 */
 	@Nonnull
-	public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvStateCompatibility(
-		StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
-		TypeSerializer<N> newNamespaceSerializer,
-		StateDescriptor<?, S> newStateDescriptor,
-		@Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
 
-		Preconditions.checkState(restoredStateMetaInfoSnapshot.getBackendStateType()
+	public static void checkStateMetaInfo(StateMetaInfoSnapshot stateMetaInfoSnapshot, StateDescriptor<?, ?> stateDesc) {
+		Preconditions.checkState(
+			stateMetaInfoSnapshot != null,
+			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
+				" but its corresponding restored snapshot cannot be found.");
+
+		Preconditions.checkState(stateMetaInfoSnapshot.getBackendStateType()
 				== StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
 			"Incompatible state types. " +
-				"Was [" + restoredStateMetaInfoSnapshot.getBackendStateType() + "], " +
+				"Was [" + stateMetaInfoSnapshot.getBackendStateType() + "], " +
 				"registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
 
 		Preconditions.checkState(
-			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+			Objects.equals(stateDesc.getName(), stateMetaInfoSnapshot.getName()),
 			"Incompatible state names. " +
-				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
-				"registered with [" + newStateDescriptor.getName() + "].");
+				"Was [" + stateMetaInfoSnapshot.getName() + "], " +
+				"registered with [" + stateDesc.getName() + "].");
 
 		final StateDescriptor.Type restoredType =
 			StateDescriptor.Type.valueOf(
-				restoredStateMetaInfoSnapshot.getOption(
+				stateMetaInfoSnapshot.getOption(
 					StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
 
-		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
-			&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
-
+		if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && restoredType != StateDescriptor.Type.UNKNOWN) {
 			Preconditions.checkState(
-				newStateDescriptor.getType() == restoredType,
+				stateDesc.getType() == restoredType,
 				"Incompatible key/value state types. " +
 					"Was [" + restoredType + "], " +
-					"registered with [" + newStateDescriptor.getType() + "].");
+					"registered with [" + stateDesc.getType() + "].");
 		}
-
-		// check compatibility results to determine if state migration is required
-		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			null,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			newNamespaceSerializer);
-
-		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
-		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.restoreTypeSerializer(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			UnloadableDummyTypeSerializer.class,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			newStateSerializer);
-
-		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
-			// TODO state migration currently isn't possible.
-			throw StateMigrationException.notSupported();
-		} else {
-			return new RegisteredKeyValueStateBackendMetaInfo<>(
-				newStateDescriptor.getType(),
-				newStateDescriptor.getName(),
-				newNamespaceSerializer,
-				newStateSerializer,
-				snapshotTransformer);
-		}
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		return computeSnapshot();
 	}
 
 	@Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 837e51fafc0..18ac7fcc454 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,10 +28,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -206,14 +205,15 @@ public HeapKeyedStateBackend(
 			StateMetaInfoSnapshot.CommonSerializerKeys serializerKey =
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
 
-			CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey),
-				null,
-				restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-				byteOrderedElementSerializer);
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+			TypeSerializerSchemaCompatibility<T, ?> compatibilityResult =
+				serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-			if (compatibilityResult.isRequiresMigration()) {
-				throw new FlinkRuntimeException(StateMigrationException.notSupported());
+			if (compatibilityResult.isIncompatible()) {
+				throw new FlinkRuntimeException(new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible."));
 			} else {
 				registeredPQStates.put(
 					stateName,
@@ -257,7 +257,14 @@ public HeapKeyedStateBackend(
 		@SuppressWarnings("unchecked")
 		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());
 
-		RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo;
+		TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
+		RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
+			stateDesc.getType(),
+			stateDesc.getName(),
+			namespaceSerializer,
+			newStateSerializer,
+			snapshotTransformer);
+
 		if (stateTable != null) {
 			@SuppressWarnings("unchecked")
 			StateMetaInfoSnapshot restoredMetaInfoSnapshot =
@@ -269,21 +276,34 @@ public HeapKeyedStateBackend(
 				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
 					" but its corresponding restored snapshot cannot be found.");
 
-			newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
-				restoredMetaInfoSnapshot,
-				namespaceSerializer,
-				stateDesc,
-				snapshotTransformer);
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<N> namespaceSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<N>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+					StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+
+			TypeSerializerSchemaCompatibility<N, ?> namespaceCompatibility =
+				namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
+
+			if (namespaceCompatibility.isIncompatible()) {
+				throw new StateMigrationException("For heap backends, the new namespace serializer cannot be incompatible.");
+			}
+
+			RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot, stateDesc);
+
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<V> stateSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<V>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+					StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
+
+			TypeSerializerSchemaCompatibility<V, ?> stateCompatibility =
+				stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+
+			if (stateCompatibility.isIncompatible()) {
+				throw new StateMigrationException("For heap backends, the new state serializer cannot be incompatible.");
+			}
 
 			stateTable.setMetaInfo(newMetaInfo);
 		} else {
-			newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
-				stateDesc.getType(),
-				stateDesc.getName(),
-				namespaceSerializer,
-				stateDesc.getSerializer(),
-				snapshotTransformer);
-
 			stateTable = snapshotStrategy.newStateTable(newMetaInfo);
 			registeredKVStates.put(stateDesc.getName(), stateTable);
 		}
@@ -415,16 +435,9 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws
 				if (!keySerializerRestored) {
 					// check for key serializer compatibility; this also reconfigures the
 					// key serializer to be compatible, if it is required and is possible
-					if (CompatibilityUtil.resolveCompatibilityResult(
-							serializationProxy.restoreKeySerializer(),
-							UnloadableDummyTypeSerializer.class,
-							serializationProxy.getKeySerializerConfigSnapshot(),
-							keySerializer)
-						.isRequiresMigration()) {
-
-						// TODO replace with state migration; note that key hash codes need to remain the same after migration
-						throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-							"Aborting now since state migration is currently not available");
+					if (!serializationProxy.getKeySerializerConfigSnapshot()
+							.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+						throw new StateMigrationException("The new key serializer must be compatible.");
 					}
 
 					keySerializerRestored = true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java
new file mode 100644
index 00000000000..3df294da969
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+
+/**
+ * Tests for the keyed state backend and operator state backend, as created by the
+ * {@link FsStateBackend}.
+ */
+public class FileStateBackendMigrationTest extends StateBackendMigrationTestBase<FsStateBackend> {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Override
+	protected FsStateBackend getStateBackend() throws Exception {
+		File checkpointPath = tempFolder.newFolder();
+		return new FsStateBackend(checkpointPath.toURI(), false);
+	}
+
+	@Override
+	protected BackendSerializationTimeliness getStateBackendSerializationTimeliness() {
+		return BackendSerializationTimeliness.ON_CHECKPOINTS;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index beea0c229c3..d34c784a1e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -42,6 +42,11 @@ protected FsStateBackend getStateBackend() throws Exception {
 		return new FsStateBackend(checkpointPath.toURI(), useAsyncMode());
 	}
 
+	@Override
+	protected BackendSerializationTimeliness getStateBackendSerializationTimeliness() throws Exception {
+		return BackendSerializationTimeliness.ON_CHECKPOINTS;
+	}
+
 	protected boolean useAsyncMode() {
 		return false;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 215d7d36c96..d84f16f5897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -56,6 +56,11 @@ protected MemoryStateBackend getStateBackend() throws Exception {
 		return new MemoryStateBackend(useAsyncMode());
 	}
 
+	@Override
+	protected BackendSerializationTimeliness getStateBackendSerializationTimeliness() throws Exception {
+		return BackendSerializationTimeliness.ON_CHECKPOINTS;
+	}
+
 	protected boolean useAsyncMode() {
 		return false;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
new file mode 100644
index 00000000000..e4b957a4fcc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -0,0 +1,780 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as produced
+ * by various {@link StateBackend}s.
+ */
+@SuppressWarnings("serial")
+public abstract class StateBackendMigrationTestBase<B extends AbstractStateBackend> extends TestLogger {
+
+	@Rule
+	public final ExpectedException expectedException = ExpectedException.none();
+
+	// lazily initialized stream storage
+	private CheckpointStorageLocation checkpointStorageLocation;
+
+	/**
+	 * Different "personalities" of {@link CustomStringSerializer}. Instead of creating
+	 * different classes we parameterize the serializer with this and
+	 * {@link CustomStringSerializerSnapshot} will instantiate serializers with the correct
+	 * personality.
+	 */
+	public enum SerializerVersion {
+		INITIAL,
+		RESTORE,
+		NEW
+	}
+
+	/**
+	 * The compatibility behaviour of {@link CustomStringSerializer}. This controls what
+	 * type of serializer {@link CustomStringSerializerSnapshot} will create for
+	 * the different methods that return/create serializers.
+	 */
+	public enum SerializerCompatibilityType {
+		COMPATIBLE_AS_IS,
+		REQUIRES_MIGRATION
+	}
+
+	/**
+	 * The serialization timeliness behaviour of the state backend under test.
+	 */
+	public enum BackendSerializationTimeliness {
+		ON_ACCESS,
+		ON_CHECKPOINTS
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testValueStateWithSerializerRequiringMigration() throws Exception {
+		CustomStringSerializer.resetCountingMaps();
+
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>(
+			"id",
+			new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.INITIAL));
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============ Modifications to the state ============
+		//  For eager serialization backends:
+		//    This should result in serializer personality INITIAL having 2 serialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should not result in any serialize / deserialize calls
+
+		backend.setCurrentKey(1);
+		state.update("1");
+		backend.setCurrentKey(2);
+		state.update("2");
+		backend.setCurrentKey(1);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Snapshot #1 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality INITIAL having 2 serialize calls
+		KeyedStateHandle snapshot1 = runSnapshot(
+			backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
+			sharedStateRegistry);
+		backend.dispose();
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Restore from snapshot #1 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality RESTORE having 2 deserialize calls
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals((Integer) 2, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		ValueStateDescriptor<String> newKvId = new ValueStateDescriptor<>("id",
+			new CustomStringSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION, SerializerVersion.NEW));
+
+		// ============ State registration that triggers state migration ============
+		//  For eager serialization backends:
+		//    This should result in serializer personality RESTORE having 2 deserialize calls, and NEW having 2 serialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals((Integer) 2, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ More modifications to the state ============
+		//  For eager serialization backends:
+		//    This should result in serializer personality NEW having 2 serialize calls and 3 deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.value());
+		restored1.update("1"); // s, NEW
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.value());
+		restored1.update("3"); // s, NEW
+		assertEquals("3", restored1.value());
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals((Integer) 3, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Snapshot #2 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality NEW having 2 serialize calls
+		KeyedStateHandle snapshot2 = runSnapshot(
+			backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
+			sharedStateRegistry);
+		backend.dispose();
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		} else {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// and restore once with NEW from NEW so that we see a read using the NEW serializer
+		// on the file backend
+		ValueStateDescriptor<String> newKvId2 = new ValueStateDescriptor<>(
+			"id",
+			new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.NEW));
+
+		// ============ Restore from snapshot #2 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality RESTORE having 2 deserialize calls
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals((Integer) 2, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId2);
+		snapshot2.discardState();
+		snapshot1.discardState();
+
+		backend.dispose();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testValueStateWithNewSerializer() throws Exception {
+		CustomStringSerializer.resetCountingMaps();
+
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>(
+			"id",
+			new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.INITIAL));
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============ Modifications to the state ============
+		//  For eager serialization backends:
+		//    This should result in serializer personality INITIAL having 2 serialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should not result in any serialize / deserialize calls
+
+		backend.setCurrentKey(1);
+		state.update("1");
+		backend.setCurrentKey(2);
+		state.update("2");
+		backend.setCurrentKey(1);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Snapshot #1 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality INITIAL having 2 serialize calls
+		KeyedStateHandle snapshot1 = runSnapshot(
+			backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
+			sharedStateRegistry);
+		backend.dispose();
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Restore from snapshot #1 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality RESTORE having 2 deserialize calls
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.INITIAL));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.INITIAL));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals((Integer) 2, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		ValueStateDescriptor<String> newKvId = new ValueStateDescriptor<>("id",
+			new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.NEW));
+		ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId);
+
+		// ============ More modifications to the state ============
+		//  For eager serialization backends:
+		//    This should result in serializer personality NEW having 2 serialize calls and 3 deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should not result in any serialize / deserialize calls
+
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.value());
+		restored1.update("1");
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.value());
+		restored1.update("3");
+		assertEquals("3", restored1.value());
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals((Integer) 3, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Snapshot #2 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality NEW having 2 serialize calls
+		KeyedStateHandle snapshot2 = runSnapshot(
+			backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
+			sharedStateRegistry);
+		snapshot1.discardState();
+		backend.dispose();
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		} else {
+			assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.get(SerializerVersion.NEW));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.NEW));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		// ============ Restore from snapshot #2 ============
+		//  For eager serialization backends:
+		//    This should not result in any serialize / deserialize calls
+		//
+		//  For lazy serialization backends:
+		//    This should result in serializer personality RESTORE having 2 deserialize calls
+		ValueStateDescriptor<String> newKvId2 = new ValueStateDescriptor<>("id",
+			new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.NEW));
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+
+		if (getStateBackendSerializationTimeliness() == BackendSerializationTimeliness.ON_ACCESS) {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals(null, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		} else {
+			assertEquals(null, CustomStringSerializer.serializeCalled.get(SerializerVersion.RESTORE));
+			assertEquals((Integer) 2, CustomStringSerializer.deserializeCalled.get(SerializerVersion.RESTORE));
+		}
+		CustomStringSerializer.resetCountingMaps();
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId2);
+		snapshot2.discardState();
+		backend.dispose();
+	}
+
+	public static class CustomStringSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private static final String EMPTY = "";
+
+		private SerializerCompatibilityType compatibilityType;
+		private SerializerVersion serializerVersion;
+
+		private static final String CONFIG_PAYLOAD = "configPayload";
+
+		// for counting how often the methods were called from serializers of the different personalities
+		public static Map<SerializerVersion, Integer> serializeCalled = new HashMap<>();
+		public static Map<SerializerVersion, Integer> deserializeCalled = new HashMap<>();
+
+		static void resetCountingMaps() {
+			serializeCalled = new HashMap<>();
+			deserializeCalled = new HashMap<>();
+		}
+
+		CustomStringSerializer(
+			SerializerCompatibilityType compatibilityType,
+			SerializerVersion serializerVersion) {
+			this.compatibilityType = compatibilityType;
+			this.serializerVersion = serializerVersion;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public String createInstance() {
+			return EMPTY;
+		}
+
+		@Override
+		public String copy(String from) {
+			return from;
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			serializeCalled.compute(serializerVersion, (k, v) -> v == null ? 1 : v + 1);
+			StringValue.writeString(record, target);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			deserializeCalled.compute(serializerVersion, (k, v) -> v == null ? 1 : v + 1);
+			return StringValue.readString(source);
+		}
+
+		@Override
+		public String deserialize(String record, DataInputView source) throws IOException {
+			deserializeCalled.compute(serializerVersion, (k, v) -> v == null ? 1 : v + 1);
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			StringValue.copyString(source, target);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof CustomStringSerializer;
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof CustomStringSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new CustomStringSerializerSnapshot(CONFIG_PAYLOAD);
+		}
+
+		SerializerCompatibilityType getCompatibilityType() {
+			return compatibilityType;
+		}
+	}
+
+	public static class CustomStringSerializerSnapshot implements TypeSerializerSnapshot<String> {
+
+		private String configPayload;
+
+		public CustomStringSerializerSnapshot() {}
+
+		public CustomStringSerializerSnapshot(String configPayload) {
+			this.configPayload = configPayload;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeUTF(configPayload);
+		}
+
+		@Override
+		public void read(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
+			configPayload = in.readUTF();
+		}
+
+		@Override
+		public TypeSerializer<String> restoreSerializer() {
+			return new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS, SerializerVersion.RESTORE);
+
+		}
+
+		@Override
+		public <NS extends TypeSerializer<String>> TypeSerializerSchemaCompatibility<String, NS> resolveSchemaCompatibility(NS newSerializer) {
+			if (newSerializer instanceof CustomStringSerializer) {
+				SerializerCompatibilityType compatibilityType = ((CustomStringSerializer) newSerializer).getCompatibilityType();
+
+				if (compatibilityType == SerializerCompatibilityType.COMPATIBLE_AS_IS) {
+					return TypeSerializerSchemaCompatibility.compatibleAsIs();
+				} else {
+					return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+				}
+			} else {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof CustomStringSerializerSnapshot;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 0;
+		}
+	}
+
+	public static class CustomVoidNamespaceSerializer extends TypeSerializer<VoidNamespace> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final CustomVoidNamespaceSerializer INSTANCE = new CustomVoidNamespaceSerializer();
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public VoidNamespace createInstance() {
+			return VoidNamespace.get();
+		}
+
+		@Override
+		public VoidNamespace copy(VoidNamespace from) {
+			return VoidNamespace.get();
+		}
+
+		@Override
+		public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) {
+			return VoidNamespace.get();
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(VoidNamespace record, DataOutputView target) throws IOException {
+			// Make progress in the stream, write one byte.
+			//
+			// We could just skip writing anything here, because of the way this is
+			// used with the state backends, but if it is ever used somewhere else
+			// (even though it is unlikely to happen), it would be a problem.
+			target.write(0);
+		}
+
+		@Override
+		public VoidNamespace deserialize(DataInputView source) throws IOException {
+			source.readByte();
+			return VoidNamespace.get();
+		}
+
+		@Override
+		public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException {
+			source.readByte();
+			return VoidNamespace.get();
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.write(source.readByte());
+		}
+
+		@Override
+		public TypeSerializer<VoidNamespace> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof CustomVoidNamespaceSerializer;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof CustomVoidNamespaceSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+			return new CustomVoidNamespaceSerializerSnapshot();
+		}
+	}
+
+	public static class CustomVoidNamespaceSerializerSnapshot implements TypeSerializerSnapshot<VoidNamespace> {
+
+		@Override
+		public TypeSerializer<VoidNamespace> restoreSerializer() {
+			return new CustomVoidNamespaceSerializer();
+		}
+
+		@Override
+		public <NS extends TypeSerializer<VoidNamespace>> TypeSerializerSchemaCompatibility<VoidNamespace, NS> resolveSchemaCompatibility(NS newSerializer) {
+			return TypeSerializerSchemaCompatibility.compatibleAsIs();
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {}
+
+		@Override
+		public void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof CustomVoidNamespaceSerializerSnapshot;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 0;
+		}
+	}
+
+	protected abstract B getStateBackend() throws Exception;
+
+	protected abstract BackendSerializationTimeliness getStateBackendSerializationTimeliness();
+
+	private CheckpointStreamFactory createStreamFactory() throws Exception {
+		if (checkpointStorageLocation == null) {
+			checkpointStorageLocation = getStateBackend()
+				.createCheckpointStorage(new JobID())
+				.initializeLocationForCheckpoint(1L);
+		}
+		return checkpointStorageLocation;
+	}
+
+	private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
+		return createKeyedBackend(keySerializer, new DummyEnvironment());
+	}
+
+	private  <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
+		return createKeyedBackend(
+			keySerializer,
+			10,
+			new KeyGroupRange(0, 9),
+			env);
+	}
+
+	private  <K> AbstractKeyedStateBackend<K> createKeyedBackend(
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		Environment env) throws Exception {
+		AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend(
+			env,
+			new JobID(),
+			"test_op",
+			keySerializer,
+			numberOfKeyGroups,
+			keyGroupRange,
+			env.getTaskKvStateRegistry());
+		backend.restore(null);
+		return backend;
+	}
+
+	private  <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
+		return restoreKeyedBackend(keySerializer, state, new DummyEnvironment());
+	}
+
+	private  <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(
+		TypeSerializer<K> keySerializer,
+		KeyedStateHandle state,
+		Environment env) throws Exception {
+		return restoreKeyedBackend(
+			keySerializer,
+			10,
+			new KeyGroupRange(0, 9),
+			Collections.singletonList(state),
+			env);
+	}
+
+	private  <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		List<KeyedStateHandle> state,
+		Environment env) throws Exception {
+		AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend(
+			env,
+			new JobID(),
+			"test_op",
+			keySerializer,
+			numberOfKeyGroups,
+			keyGroupRange,
+			env.getTaskKvStateRegistry());
+		backend.restore(new StateObjectCollection<>(state));
+		return backend;
+	}
+
+	private KeyedStateHandle runSnapshot(
+		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture,
+		SharedStateRegistry sharedStateRegistry) throws Exception {
+
+		if (!snapshotRunnableFuture.isDone()) {
+			snapshotRunnableFuture.run();
+		}
+
+		SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();
+		KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
+		if (jobManagerOwnedSnapshot != null) {
+			jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
+		}
+		return jobManagerOwnedSnapshot;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0712b641678..41a6ab88247 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -131,6 +131,7 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -147,11 +148,21 @@
 	@Rule
 	public final ExpectedException expectedException = ExpectedException.none();
 
+	/**
+	 * The serialization timeliness behaviour of the state backend under test.
+	 */
+	public enum BackendSerializationTimeliness {
+		ON_ACCESS,
+		ON_CHECKPOINTS
+	}
+
 	// lazily initialized stream storage
 	private CheckpointStorageLocation checkpointStorageLocation;
 
 	protected abstract B getStateBackend() throws Exception;
 
+	protected abstract BackendSerializationTimeliness getStateBackendSerializationTimeliness() throws Exception;
+
 	protected abstract boolean isSerializerPresenceRequiredOnRestore();
 
 	protected CheckpointStreamFactory createStreamFactory() throws Exception {
@@ -803,6 +814,9 @@ public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throw
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
+
+		assumeThat(getStateBackendSerializationTimeliness(), is(BackendSerializationTimeliness.ON_ACCESS));
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		Environment env = new DummyEnvironment();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
@@ -969,6 +983,9 @@ public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exc
 
 	@Test
 	public void testStateSerializerReconfiguration() throws Exception {
+
+		assumeThat(getStateBackendSerializationTimeliness(), is(BackendSerializationTimeliness.ON_ACCESS));
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		Environment env = new DummyEnvironment();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index f9f108a3a12..54eedb229c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -387,6 +387,8 @@ public void testSnapshotChangeRestore() throws Exception {
 	@Test(expected = StateMigrationException.class)
 	public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
 		assumeThat(this, not(instanceOf(MockTtlStateTest.class)));
+		assumeThat(this, not(instanceOf(HeapAsyncSnapshotTtlStateTest.class)));
+		assumeThat(this, not(instanceOf(HeapSyncSnapshotTtlStateTest.class)));
 
 		initTest();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 8b8fbb23a99..2218bc0d3e0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
@@ -154,6 +155,20 @@ public void setCurrentNamespace(N namespace) {
 		return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
 	}
 
+	public void migrateSerializedValue(
+			DataInputDeserializer serializedOldValueInput,
+			DataOutputSerializer serializedMigratedValueOutput,
+			TypeSerializer<V> priorSerializer,
+			TypeSerializer<V> newSerializer) throws StateMigrationException {
+
+		try {
+			V value = priorSerializer.deserialize(serializedOldValueInput);
+			newSerializer.serialize(value, serializedMigratedValueOutput);
+		} catch (Exception e) {
+			throw new StateMigrationException("Error while trying to migration RocksDB state.", e);
+		}
+	}
+
 	byte[] getKeyBytes() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3d66db6e6fc..77fef509c38 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,10 +27,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -95,6 +94,7 @@
 import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.Snapshot;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -694,16 +694,9 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
 
 			// check for key serializer compatibility; this also reconfigures the
 			// key serializer to be compatible, if it is required and is possible
-			if (CompatibilityUtil.resolveCompatibilityResult(
-				serializationProxy.restoreKeySerializer(),
-				UnloadableDummyTypeSerializer.class,
-				serializationProxy.getKeySerializerConfigSnapshot(),
-				rocksDBKeyedStateBackend.keySerializer)
-				.isRequiresMigration()) {
-
-				// TODO replace with state migration; note that key hash codes need to remain the same after migration
-				throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-					"Aborting now since state migration is currently not available");
+			if (!serializationProxy.getKeySerializerConfigSnapshot()
+					.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs()) {
+				throw new StateMigrationException("The new key serializer must be compatible.");
 			}
 
 			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
@@ -1245,16 +1238,9 @@ private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
 
 				// check for key serializer compatibility; this also reconfigures the
 				// key serializer to be compatible, if it is required and is possible
-				if (CompatibilityUtil.resolveCompatibilityResult(
-					serializationProxy.restoreKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					serializationProxy.getKeySerializerConfigSnapshot(),
-					stateBackend.keySerializer)
-					.isRequiresMigration()) {
-
-					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-						"Aborting now since state migration is currently not available");
+				if (!serializationProxy.getKeySerializerConfigSnapshot()
+						.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
+					throw new StateMigrationException("The new key serializer must be compatible.");
 				}
 
 				return serializationProxy.getStateMetaInfoSnapshots();
@@ -1345,42 +1331,32 @@ private void copyStateDataHandleData(
 	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
 	 * or create a new one if it does not exist.
 	 */
-	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
-			StateDescriptor<?, S> stateDesc,
+	private <N, S extends State, SV> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tryRegisterKvStateInformation(
+			StateDescriptor<S, SV> stateDesc,
 			TypeSerializer<N> namespaceSerializer,
-			@Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
+			@Nullable StateSnapshotTransformer<SV> snapshotTransformer) throws Exception {
 
 		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
 			kvStateInformation.get(stateDesc.getName());
 
-		RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
-		if (stateInfo != null) {
-
-			StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
+		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
+		RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
+			stateDesc.getType(),
+			stateDesc.getName(),
+			namespaceSerializer,
+			stateSerializer,
+			snapshotTransformer);
 
-			Preconditions.checkState(
-				restoredMetaInfoSnapshot != null,
-				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
-					" but its corresponding restored snapshot cannot be found.");
-
-			newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
-				restoredMetaInfoSnapshot,
-				namespaceSerializer,
+		if (stateInfo != null) {
+			migrateStateIfNecessary(
+				newMetaInfo,
 				stateDesc,
-				snapshotTransformer);
+				namespaceSerializer,
+				stateInfo);
 
 			stateInfo.f1 = newMetaInfo;
 		} else {
-			String stateName = stateDesc.getName();
-
-			newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
-				stateDesc.getType(),
-				stateName,
-				namespaceSerializer,
-				stateDesc.getSerializer(),
-				snapshotTransformer);
-
-			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
+			ColumnFamilyHandle columnFamily = createColumnFamily(stateDesc.getName());
 
 			stateInfo = Tuple2.of(columnFamily, newMetaInfo);
 			kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1389,6 +1365,119 @@ private void copyStateDataHandleData(
 		return Tuple2.of(stateInfo.f0, newMetaInfo);
 	}
 
+	private <N, S extends State, SV> void migrateStateIfNecessary(
+			RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo,
+			StateDescriptor<S, SV> stateDesc,
+			TypeSerializer<N> namespaceSerializer,
+			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo) throws Exception {
+
+		StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
+
+		RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot, stateDesc);
+
+		@SuppressWarnings("unchecked")
+		TypeSerializerSnapshot<N> namespaceSerializerSnapshot = Preconditions.checkNotNull(
+			(TypeSerializerSnapshot<N>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+
+		TypeSerializerSchemaCompatibility<N, ?> namespaceCompatibility =
+			namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
+		if (!namespaceCompatibility.isCompatibleAsIs()) {
+			throw new StateMigrationException("The new namespace serializer must be compatible.");
+		}
+
+		@SuppressWarnings("unchecked")
+		TypeSerializerSnapshot<SV> stateSerializerSnapshot = Preconditions.checkNotNull(
+			(TypeSerializerSnapshot<SV>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
+
+		TypeSerializer<SV> newStateSerializer = stateDesc.getSerializer();
+		TypeSerializerSchemaCompatibility<SV, ?> stateCompatibility =
+			stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+
+		if (stateCompatibility.isCompatibleAfterMigration()) {
+			migrateStateValues(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo);
+		} else if (stateCompatibility.isIncompatible()) {
+			throw new StateMigrationException("The new state serializer is incompatible with previous state.");
+		}
+	}
+
+	/**
+	 * Migrate only the state value, that is the "value" that is stored in RocksDB. We don't migrate
+	 * the key here, which is made up of key group, key, namespace and map key
+	 * (in case of MapState).
+	 */
+	private <N, S extends State, SV> void migrateStateValues(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo,
+		StateMetaInfoSnapshot restoredMetaInfoSnapshot,
+		RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo) throws Exception {
+
+		if (stateDesc.getType() == StateDescriptor.Type.MAP) {
+			throw new StateMigrationException("The new serializer for a MapState requires state migration in order for the job to proceed." +
+				" However, migration for MapState currently isn't supported.");
+		}
+
+		LOG.info(
+			"Performing state migration for state {} because the state serializer's schema, i.e. serialization format, has changed.",
+			stateDesc);
+
+		// we need to get an actual state instance because migration is different
+		// for different state types. For example, ListState needs to deal with
+		// individual elements
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		State state = stateFactory.createState(
+			stateDesc,
+			Tuple2.of(stateInfo.f0, newMetaInfo),
+			RocksDBKeyedStateBackend.this);
+		if (!(state instanceof AbstractRocksDBState)) {
+			throw new FlinkRuntimeException(
+				"State should be an AbstractRocksDBState but is " + state);
+		}
+
+		@SuppressWarnings("unchecked")
+		AbstractRocksDBState<?, ?, SV, S> rocksDBState = (AbstractRocksDBState<?, ?, SV, S>) state;
+
+		Snapshot rocksDBSnapshot = db.getSnapshot();
+		try (
+			RocksIteratorWrapper iterator = getRocksIterator(db, stateInfo.f0);
+			RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions())
+		) {
+			iterator.seekToFirst();
+
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<SV> priorValueSerializerSnapshot = (TypeSerializerSnapshot<SV>)
+				Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+			TypeSerializer<SV> priorValueSerializer = priorValueSerializerSnapshot.restoreSerializer();
+
+			DataInputDeserializer serializedValueInput = new DataInputDeserializer();
+			DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(1);
+			while (iterator.isValid()) {
+				serializedValueInput.setBuffer(iterator.value());
+
+				rocksDBState.migrateSerializedValue(
+					serializedValueInput,
+					migratedSerializedValueOutput,
+					priorValueSerializer,
+					stateDesc.getSerializer());
+
+				batchWriter.put(stateInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer());
+
+				migratedSerializedValueOutput.clear();
+				iterator.next();
+			}
+			batchWriter.flush();
+		} finally {
+			db.releaseSnapshot(rocksDBSnapshot);
+			rocksDBSnapshot.close();
+		}
+	}
+
 	/**
 	 * Creates a column family handle for use with a k/v state.
 	 */
@@ -1585,13 +1674,14 @@ public static RocksIteratorWrapper getRocksIterator(
 			TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
 
 			if (metaInfoTypeSerializer != byteOrderedElementSerializer) {
-				CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					metaInfoTypeSerializer,
-					null,
-					restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-					byteOrderedElementSerializer);
+				@SuppressWarnings("unchecked")
+				TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull(
+					(TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+				TypeSerializerSchemaCompatibility<T, ?> compatibilityResult =
+					serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-				if (compatibilityResult.isRequiresMigration()) {
+				if (compatibilityResult.isCompatibleAfterMigration()) {
 					throw new FlinkRuntimeException(StateMigrationException.notSupported());
 				}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f70c6a57bad..2c8f0d1a1e8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -31,6 +32,7 @@
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
@@ -242,6 +244,36 @@ public void addAll(List<V> values) {
 		}
 	}
 
+	@Override
+	public void migrateSerializedValue(
+			DataInputDeserializer serializedOldValueInput,
+			DataOutputSerializer serializedMigratedValueOutput,
+			TypeSerializer<List<V>> priorSerializer,
+			TypeSerializer<List<V>> newSerializer) throws StateMigrationException {
+
+		Preconditions.checkArgument(priorSerializer instanceof ListSerializer);
+		Preconditions.checkArgument(newSerializer instanceof ListSerializer);
+
+		TypeSerializer<V> priorElementSerializer =
+			((ListSerializer<V>) priorSerializer).getElementSerializer();
+
+		TypeSerializer<V> newElementSerializer =
+			((ListSerializer<V>) newSerializer).getElementSerializer();
+
+		try {
+			while (serializedOldValueInput.available() > 0) {
+				V element = priorElementSerializer.deserialize(serializedOldValueInput);
+				newElementSerializer.serialize(element, serializedMigratedValueOutput);
+				if (serializedOldValueInput.available() > 0) {
+					serializedOldValueInput.readByte();
+					serializedMigratedValueOutput.write(DELIMITER);
+				}
+			}
+		} catch (Exception e) {
+			throw new StateMigrationException("Error while trying to migrate RocksDB list state.", e);
+		}
+	}
+
 	private static <V> byte[] getPreMergedValue(
 		List<V> values,
 		TypeSerializer<V> elementSerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java
new file mode 100644
index 00000000000..ee9698076cb
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.StateBackendMigrationTestBase;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.rocksdb.RocksObject;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+/**
+ * Tests for the partitioned state part of {@link RocksDBStateBackend}.
+ */
+@RunWith(Parameterized.class)
+public class RocksDBStateBackendMigrationTest extends StateBackendMigrationTestBase<RocksDBStateBackend> {
+
+	private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
+	private List<RocksObject> allCreatedCloseables;
+
+	@Parameterized.Parameters(name = "Incremental checkpointing: {0}")
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(false, true);
+	}
+
+	@Parameterized.Parameter
+	public boolean enableIncrementalCheckpointing;
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	// Store it because we need it for the cleanup test.
+	private String dbPath;
+
+	@Override
+	protected RocksDBStateBackend getStateBackend() throws IOException {
+		dbPath = tempFolder.newFolder().getAbsolutePath();
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
+		backend.setDbStoragePath(dbPath);
+		return backend;
+	}
+
+	@Override
+	protected BackendSerializationTimeliness getStateBackendSerializationTimeliness() {
+		return BackendSerializationTimeliness.ON_ACCESS;
+	}
+
+	// small safety net for instance cleanups, so that no native objects are left
+	@After
+	public void cleanupRocksDB() {
+		if (keyedStateBackend != null) {
+			IOUtils.closeQuietly(keyedStateBackend);
+			keyedStateBackend.dispose();
+		}
+		if (allCreatedCloseables != null) {
+			for (RocksObject rocksCloseable : allCreatedCloseables) {
+				verify(rocksCloseable, times(1)).close();
+			}
+			allCreatedCloseables = null;
+		}
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 4916251fc1f..6a4faa036f6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -134,6 +134,11 @@ protected RocksDBStateBackend getStateBackend() throws IOException {
 		return backend;
 	}
 
+	@Override
+	protected BackendSerializationTimeliness getStateBackendSerializationTimeliness() throws Exception {
+		return BackendSerializationTimeliness.ON_ACCESS;
+	}
+
 	@Override
 	protected boolean isSerializerPresenceRequiredOnRestore() {
 		return false;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement state conversion procedure in state backends
> ------------------------------------------------------
>
>                 Key: FLINK-9808
>                 URL: https://issues.apache.org/jira/browse/FLINK-9808
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Aljoscha Krettek
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source of truth for recreating restore serializers, the next step would be to utilize this when performing a full-pass state conversion (i.e., read with old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion inherently happens, since all state is always deserialized after restore with the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state conversion needs to happen for per-registered state on their first access if the registered new serializer has a different serialization schema than the previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the new serializer's schema is a) compatible with the serializer as it is, b) compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer compatibility checks. That is not required because those backends always perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message