flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [14/15] flink git commit: [FLINK-6804] [state] Consistent state migration behaviour across state backends
Date Tue, 13 Jun 2017 05:49:11 GMT
[FLINK-6804] [state] Consistent state migration behaviour across state backends

Prior to this commit, memory and non-memory state backends behaved
differently w.r.t. state migration. For the memory backends, we did
not require the new serializer to be compatible in order for the job to
proceed after restore, because all state have already been deserialized
to objects and the new serializer can always just be used as is.
Therefore, the compatibility checks were not performed for the memory
backends, resulting in different code paths between the different state
backends.

However, this inconsistent behaviour across backends will be confusing
for users. This commit adds the code path to check the newly registered
serializer's compatibility in the memory backends (even though it isn't
required), and deliberately fails the job if the new serializer is
incompatible.

Note that the compatibiilty code paths will be truly unified and
required for all backends once we have eager state registration.

This closes #4073.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/379be13b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/379be13b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/379be13b

Branch: refs/heads/release-1.3
Commit: 379be13b67948d28be66e071072412c870d6e1f8
Parents: 8a0276a
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Jun 4 22:40:26 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 07:48:33 2017 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/CompatibilityUtil.java |   8 +-
 .../state/DefaultOperatorStateBackend.java      | 119 +++++++++++++++----
 .../state/heap/HeapKeyedStateBackend.java       |  54 ++++++++-
 3 files changed, 151 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/379be13b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 94bb9bd..df7f216 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -66,13 +66,11 @@ public class CompatibilityUtil {
 			} else {
 				if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag)))
{
 					// if the preceding serializer exists and is not a dummy, use
-					// that for converting instead of the provided convert deserializer
+					// that for converting instead of any provided convert deserializer
 					return CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
-				} else if (initialResult.getConvertDeserializer() != null) {
-					return initialResult;
 				} else {
-					throw new RuntimeException(
-						"State migration required, but there is no available serializer capable of reading
previous data.");
+					// requires migration (may or may not have a convert deserializer)
+					return initialResult;
 				}
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/379be13b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 0f96dac..b16ac06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -36,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +95,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 	 */
 	private final boolean asynchronousSnapshots;
 
+	/**
+	 * Map of state names to their corresponding restored state meta info.
+	 *
+	 * <p>TODO this map can be removed when eager-state registration is in place.
+	 * TODO we currently need this cached to check state migration strategies when new serializers
are registered.
+	 */
+	private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>>
restoredStateMetaInfos;
+
+	/**
+	 * Cache of already accessed states.
+	 *
+	 * <p>In contrast to {@link #registeredStates} and {@link #restoredStateMetaInfos}
which may be repopulated
+	 * with restored state, this map is always empty at the beginning.
+	 *
+	 * <p>TODO this map should be moved to a base class once we have proper hierarchy
for the operator state backends.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>
+	 */
+	private final HashMap<String, PartitionableListState<?>> accessedStatesByName;
+
 	public DefaultOperatorStateBackend(
 		ClassLoader userClassLoader,
 		ExecutionConfig executionConfig,
@@ -103,6 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 		this.javaSerializer = new JavaSerializer<>();
 		this.registeredStates = new HashMap<>();
 		this.asynchronousSnapshots = asynchronousSnapshots;
+		this.accessedStatesByName = new HashMap<>();
+		this.restoredStateMetaInfos = new HashMap<>();
 	}
 
 	public ExecutionConfig getExecutionConfig() {
@@ -314,6 +339,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
 					}
 
+					restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
 					PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
 
 					if (null == listState) {
@@ -359,7 +386,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 		/**
 		 * Meta information of the state, including state name, assignment mode, and serializer
 		 */
-		private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+		private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
 
 		/**
 		 * The internal list the holds the elements of the state
@@ -389,12 +416,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 			this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
-		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
-			return stateMetaInfo;
+		public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo)
{
+			this.stateMetaInfo = stateMetaInfo;
 		}
 
-		public List<S> getInternalList() {
-			return internalList;
+		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+			return stateMetaInfo;
 		}
 
 		public PartitionableListState<S> deepCopy() {
@@ -441,19 +468,32 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 	}
 
 	private <S> ListState<S> getListState(
-		ListStateDescriptor<S> stateDescriptor,
-		OperatorStateHandle.Mode mode) throws IOException {
+			ListStateDescriptor<S> stateDescriptor,
+			OperatorStateHandle.Mode mode) throws IOException, StateMigrationException {
+
 		Preconditions.checkNotNull(stateDescriptor);
+		String name = Preconditions.checkNotNull(stateDescriptor.getName());
 
-		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+		@SuppressWarnings("unchecked")
+		PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
+		if (previous != null) {
+			checkStateNameAndMode(previous.getStateMetaInfo(), name, mode);
+			return previous;
+		}
 
-		String name = Preconditions.checkNotNull(stateDescriptor.getName());
+		// end up here if its the first time access after execution for the
+		// provided state name; check compatibility of restored state, if any
+		// TODO with eager registration in place, these checks should be moved to restore()
+
+		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
 		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
 		@SuppressWarnings("unchecked")
 		PartitionableListState<S> partitionableListState = (PartitionableListState<S>)
registeredStates.get(name);
 
 		if (null == partitionableListState) {
+			// no restored state for the state name; simply create new state holder
+
 			partitionableListState = new PartitionableListState<>(
 				new RegisteredOperatorBackendStateMetaInfo<>(
 					name,
@@ -462,21 +502,38 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 
 			registeredStates.put(name, partitionableListState);
 		} else {
-			// TODO with eager registration in place, these checks should be moved to restore()
-
-			Preconditions.checkState(
-				partitionableListState.getStateMetaInfo().getName().equals(name),
-				"Incompatible state names. " +
-					"Was [" + partitionableListState.getStateMetaInfo().getName() + "], " +
-					"registered with [" + name + "].");
-
-			Preconditions.checkState(
-				partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
-				"Incompatible state assignment modes. " +
-					"Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
-					"registered with [" + mode + "].");
+			// has restored state; check compatibility of new state access
+
+			checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode);
+
+			@SuppressWarnings("unchecked")
+			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> restoredMetaInfo =
+				(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredStateMetaInfos.get(name);
+
+			// check compatibility to determine if state migration is required
+			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getPartitionStateSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
+					partitionStateSerializer);
+
+			if (!stateCompatibility.isRequiresMigration()) {
+				// new serializer is compatible; use it to replace the old serializer
+				partitionableListState.setStateMetaInfo(
+					new RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode));
+			} else {
+				// TODO state migration currently isn't possible.
+
+				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
+				// since the state has already been deserialized to objects and we can just continue
with
+				// the new serializer; we're deliberately failing here for now to have equal functionality
with
+				// the RocksDB backend to avoid confusion for users.
+
+				throw new StateMigrationException("State migration isn't supported, yet.");
+			}
 		}
 
+		accessedStatesByName.put(name, partitionableListState);
 		return partitionableListState;
 	}
 
@@ -497,4 +554,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 			}
 		}
 	}
+
+	private static void checkStateNameAndMode(
+			RegisteredOperatorBackendStateMetaInfo previousMetaInfo,
+			String expectedName,
+			OperatorStateHandle.Mode expectedMode) {
+
+		Preconditions.checkState(
+			previousMetaInfo.getName().equals(expectedName),
+			"Incompatible state names. " +
+				"Was [" + previousMetaInfo.getName() + "], " +
+				"registered with [" + expectedName + "].");
+
+		Preconditions.checkState(
+			previousMetaInfo.getAssignmentMode().equals(expectedMode),
+			"Incompatible state assignment modes. " +
+				"Was [" + previousMetaInfo.getAssignmentMode() + "], " +
+				"registered with [" + expectedMode + "].");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/379be13b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2ab9691..35a70bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -36,6 +37,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
@@ -98,6 +100,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
 
 	/**
+	 * Map of state names to their corresponding restored state meta info.
+	 *
+	 * <p>
+	 * TODO this map can be removed when eager-state registration is in place.
+	 * TODO we currently need this cached to check state migration strategies when new serializers
are registered.
+	 */
+	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>>
restoredKvStateMetaInfos;
+
+	/**
 	 * Determines whether or not we run snapshots asynchronously. This impacts the choice of
the underlying
 	 * {@link StateTable} implementation.
 	 */
@@ -115,6 +126,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange,
executionConfig);
 		this.asynchronousSnapshots = asynchronousSnapshots;
 		LOG.info("Initializing heap keyed state backend with stream factory.");
+
+		this.restoredKvStateMetaInfos = new HashMap<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -122,7 +135,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	// ------------------------------------------------------------------------
 
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws
StateMigrationException {
 
 		return tryRegisterStateTable(
 				stateDesc.getName(), stateDesc.getType(),
@@ -133,7 +146,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			String stateName,
 			StateDescriptor.Type stateType,
 			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<V> valueSerializer) {
+			TypeSerializer<V> valueSerializer) throws StateMigrationException {
 
 		final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
 				new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer,
valueSerializer);
@@ -163,7 +176,36 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 						"registered with [" + newMetaInfo.getStateType() + "].");
 			}
 
-			stateTable.setMetaInfo(newMetaInfo);
+			@SuppressWarnings("unchecked")
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
+
+			// check compatibility results to determine if state migration is required
+			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getNamespaceSerializer(),
+					MigrationNamespaceSerializerProxy.class,
+					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+					newMetaInfo.getNamespaceSerializer());
+
+			CompatibilityResult<V> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getStateSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					restoredMetaInfo.getStateSerializerConfigSnapshot(),
+					newMetaInfo.getStateSerializer());
+
+			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration())
{
+				// new serializers are compatible; use them to replace the old serializers
+				stateTable.setMetaInfo(newMetaInfo);
+			} else {
+				// TODO state migration currently isn't possible.
+
+				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
+				// since the state has already been deserialized to objects and we can just continue
with
+				// the new serializer; we're deliberately failing here for now to have equal functionality
with
+				// the RocksDB backend to avoid confusion for users.
+
+				throw new StateMigrationException("State migration isn't supported, yet.");
+			}
 		}
 
 		return stateTable;
@@ -427,6 +469,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 							" in future versions.");
 					}
 
+					restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
 					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously
@@ -528,6 +572,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K,
?, ?>) genericSnapshot;
 				final StateTable rawResultMap =
 						stateSnapshot.deserialize(stateName, this);
+
+				// mimic a restored kv state meta info
+				restoredKvStateMetaInfos.put(stateName, rawResultMap.getMetaInfo().snapshot());
+
 				// add named state to the backend
 				stateTables.put(stateName, rawResultMap);
 			} else {


Mime
View raw message