flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Date Fri, 13 Jul 2018 09:24:05 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293686
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV>
migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot
=
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo,"
+
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    +			if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +				throw new UnsupportedOperationException(
    +					"Changing the TypeSerializers of a MapState in an incompatible way is currently
not supported.");
    +			}
    +
    +			LOG.info(
    +				"Performing state migration for state {} because the state serializer changed in
an incompatible way.",
    +				stateDesc);
    +
    +			// we need to get an actual state instance because migration is different
    +			// for different state types. For example, ListState needs to deal with
    +			// individual elements
    +			StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
    +			if (stateFactory == null) {
    +				String message = String.format("State %s is not supported by %s",
    +					stateDesc.getClass(), this.getClass());
    +				throw new FlinkRuntimeException(message);
    +			}
    +
    +			State state = stateFactory.createState(
    +				stateDesc,
    +				Tuple2.of(stateInfo.f0, newMetaInfo),
    +				RocksDBKeyedStateBackend.this);
    +
    +			if (!(state instanceof AbstractRocksDBState)) {
    +				throw new FlinkRuntimeException(
    +					"State should be an AbstractRocksDBState but is " + state);
    +			}
    +
    +			AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state;
    --- End diff --
    
    Avoid using raw types for the reference.


---

Mime
View raw message