From issues-return-197527-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Oct 25 18:45:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DE39B180670 for ; Thu, 25 Oct 2018 18:45:17 +0200 (CEST) Received: (qmail 47283 invoked by uid 500); 25 Oct 2018 16:45:17 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 47273 invoked by uid 99); 25 Oct 2018 16:45:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Oct 2018 16:45:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7A6A3C20C8 for ; Thu, 25 Oct 2018 16:45:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id YL7uos69gTEK for ; Thu, 25 Oct 2018 16:45:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 6FB175F206 for ; Thu, 25 Oct 2018 16:45:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 88348E25D5 for ; Thu, 25 Oct 2018 16:45:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 05CA52669C for ; Thu, 25 Oct 2018 16:45:01 +0000 (UTC) Date: Thu, 25 Oct 2018 16:45:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-9808?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1666= 3989#comment-16663989 ]=20 ASF GitHub Bot commented on FLINK-9808: --------------------------------------- tzulitai closed pull request #6875: [FLINK-9808] [state backends] Migrate s= tate when necessary in state backends URL: https://github.com/apache/flink/pull/6875 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils= /TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common= /typeutils/TypeSerializer.java index 7a1675eafba..b8282fe238c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSe= rializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSe= rializer.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.StateMigrationException; =20 import java.io.IOException; import java.io.Serializable; @@ -238,7 +239,8 @@ =09=09=09} else if (compat.isCompatibleAfterMigration()) { =09=09=09=09return CompatibilityResult.requiresMigration(); =09=09=09} else if (compat.isIncompatible()) { -=09=09=09=09throw new IllegalStateException("The new serializer is incompa= tible."); +=09=09=09=09throw new RuntimeException( +=09=09=09=09=09new StateMigrationException("The new serializer is incompat= ible, meaning that the new serializer can't be used even if state migration= is performed.")); =09=09=09} else { =09=09=09=09throw new IllegalStateException("Unidentifiable schema compati= bility type. This is a bug, please file a JIRA."); =09=09=09} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Def= aultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flin= k/runtime/state/DefaultOperatorStateBackend.java index eae5a3bccdd..ae4fbaa133b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOpe= ratorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOpe= ratorStateBackend.java @@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibi= lity; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer= ; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; @@ -228,42 +228,32 @@ public void dispose() { =20 =09=09=09final StateMetaInfoSnapshot metaInfoSnapshot =3D restoredBroadcas= tStateMetaInfos.get(name); =20 -=09=09=09@SuppressWarnings("unchecked") -=09=09=09RegisteredBroadcastStateBackendMetaInfo restoredMetaInfo = =3D new RegisteredBroadcastStateBackendMetaInfo(metaInfoSnapshot); +=09=09=09// check whether new serializers are incompatible +=09=09=09TypeSerializerSnapshot keySerializerSnapshot =3D Preconditions= .checkNotNull( +=09=09=09=09(TypeSerializerSnapshot) metaInfoSnapshot.getTypeSerializer= ConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)); =20 -=09=09=09// check compatibility to determine if state migration is require= d -=09=09=09CompatibilityResult keyCompatibility =3D CompatibilityUtil.res= olveCompatibilityResult( -=09=09=09=09=09restoredMetaInfo.getKeySerializer(), -=09=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09=09//TODO this keys should not be exposed and should be adapte= d after FLINK-9377 was merged -=09=09=09=09=09metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaI= nfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), -=09=09=09=09=09broadcastStateKeySerializer); - -=09=09=09CompatibilityResult valueCompatibility =3D CompatibilityUtil.r= esolveCompatibilityResult( -=09=09=09=09=09restoredMetaInfo.getValueSerializer(), -=09=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09=09//TODO this keys should not be exposed and should be adapte= d after FLINK-9377 was merged -=09=09=09=09=09metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaI= nfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER), -=09=09=09=09=09broadcastStateValueSerializer); - -=09=09=09if (!keyCompatibility.isRequiresMigration() && !valueCompatibilit= y.isRequiresMigration()) { -=09=09=09=09// new serializer is compatible; use it to replace the old ser= ializer -=09=09=09=09broadcastState.setStateMetaInfo( -=09=09=09=09=09=09new RegisteredBroadcastStateBackendMetaInfo<>( -=09=09=09=09=09=09=09=09name, -=09=09=09=09=09=09=09=09OperatorStateHandle.Mode.BROADCAST, -=09=09=09=09=09=09=09=09broadcastStateKeySerializer, -=09=09=09=09=09=09=09=09broadcastStateValueSerializer)); -=09=09=09} else { -=09=09=09=09// TODO state migration currently isn't possible. - -=09=09=09=09// NOTE: for heap backends, it is actually fine to proceed her= e without failing the restore, -=09=09=09=09// since the state has already been deserialized to objects an= d we can just continue with -=09=09=09=09// the new serializer; we're deliberately failing here for now= to have equal functionality with -=09=09=09=09// the RocksDB backend to avoid confusion for users. - -=09=09=09=09throw StateMigrationException.notSupported(); +=09=09=09TypeSerializerSchemaCompatibility keyCompatibility =3D +=09=09=09=09keySerializerSnapshot.resolveSchemaCompatibility(broadcastStat= eKeySerializer); +=09=09=09if (keyCompatibility.isIncompatible()) { +=09=09=09=09throw new StateMigrationException("The new key serializer for = broadcast state must not be incompatible."); +=09=09=09} + +=09=09=09TypeSerializerSnapshot valueSerializerSnapshot =3D Preconditio= ns.checkNotNull( +=09=09=09=09(TypeSerializerSnapshot) metaInfoSnapshot.getTypeSerializer= ConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)= ); + +=09=09=09TypeSerializerSchemaCompatibility valueCompatibility =3D +=09=09=09=09valueSerializerSnapshot.resolveSchemaCompatibility(broadcastSt= ateValueSerializer); +=09=09=09if (valueCompatibility.isIncompatible()) { +=09=09=09=09throw new StateMigrationException("The new value serializer fo= r broadcast state must not be incompatible."); =09=09=09} + +=09=09=09// new serializer is compatible; use it to replace the old serial= izer +=09=09=09broadcastState.setStateMetaInfo( +=09=09=09=09=09new RegisteredBroadcastStateBackendMetaInfo<>( +=09=09=09=09=09=09=09name, +=09=09=09=09=09=09=09OperatorStateHandle.Mode.BROADCAST, +=09=09=09=09=09=09=09broadcastStateKeySerializer, +=09=09=09=09=09=09=09broadcastStateValueSerializer)); =09=09} =20 =09=09accessedBroadcastStatesByName.put(name, broadcastState); @@ -606,27 +596,19 @@ public void addAll(List values) { =20 =09=09=09// check compatibility to determine if state migration is require= d =09=09=09TypeSerializer newPartitionStateSerializer =3D partitionStateS= erializer.duplicate(); -=09=09=09CompatibilityResult stateCompatibility =3D CompatibilityUtil.r= esolveCompatibilityResult( -=09=09=09=09=09metaInfo.getPartitionStateSerializer(), -=09=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09=09//TODO this keys should not be exposed and should be adapte= d after FLINK-9377 was merged -=09=09=09=09=09restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaI= nfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER), -=09=09=09=09=09newPartitionStateSerializer); - -=09=09=09if (!stateCompatibility.isRequiresMigration()) { -=09=09=09=09// new serializer is compatible; use it to replace the old ser= ializer -=09=09=09=09partitionableListState.setStateMetaInfo( -=09=09=09=09=09new RegisteredOperatorStateBackendMetaInfo<>(name, newParti= tionStateSerializer, mode)); -=09=09=09} else { -=09=09=09=09// TODO state migration currently isn't possible. - -=09=09=09=09// NOTE: for heap backends, it is actually fine to proceed her= e without failing the restore, -=09=09=09=09// since the state has already been deserialized to objects an= d we can just continue with -=09=09=09=09// the new serializer; we're deliberately failing here for now= to have equal functionality with -=09=09=09=09// the RocksDB backend to avoid confusion for users. - -=09=09=09=09throw StateMigrationException.notSupported(); + +=09=09=09@SuppressWarnings("unchecked") +=09=09=09TypeSerializerSnapshot stateSerializerSnapshot =3D Preconditio= ns.checkNotNull( +=09=09=09=09(TypeSerializerSnapshot) restoredSnapshot.getTypeSerializer= ConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)= ); + +=09=09=09TypeSerializerSchemaCompatibility stateCompatibility =3D +=09=09=09=09stateSerializerSnapshot.resolveSchemaCompatibility(newPartitio= nStateSerializer); +=09=09=09if (stateCompatibility.isIncompatible()) { +=09=09=09=09throw new StateMigrationException("The new state serializer fo= r operator state must not be incompatible."); =09=09=09} + +=09=09=09partitionableListState.setStateMetaInfo( +=09=09=09=09new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitio= nStateSerializer, mode)); =09=09} =20 =09=09accessedStatesByName.put(name, partitionableListState); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Reg= isteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/= apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java index 7f95ed70326..d05f31a0c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Registered= KeyValueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Registered= KeyValueStateBackendMetaInfo.java @@ -19,14 +19,10 @@ package org.apache.flink.runtime.state; =20 import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer= ; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StateMigrationException; =20 import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -151,79 +147,42 @@ public int hashCode() { =09=09return result; =09} =20 -=09/** -=09 * Checks compatibility of a restored k/v state, with the new {@link St= ateDescriptor} provided to it. -=09 * This checks that the descriptor specifies identical names and state = types, as well as -=09 * serializers that are compatible for the restored k/v state bytes. -=09 */ =09@Nonnull -=09public static RegisteredKeyValueStateBackendMetaInfo resol= veKvStateCompatibility( -=09=09StateMetaInfoSnapshot restoredStateMetaInfoSnapshot, -=09=09TypeSerializer newNamespaceSerializer, -=09=09StateDescriptor newStateDescriptor, -=09=09@Nullable StateSnapshotTransformer snapshotTransformer) throws St= ateMigrationException { +=09@Override +=09public StateMetaInfoSnapshot snapshot() { +=09=09return computeSnapshot(); +=09} =20 -=09=09Preconditions.checkState(restoredStateMetaInfoSnapshot.getBackendSta= teType() +=09public static void checkStateMetaInfo(StateMetaInfoSnapshot stateMetaIn= foSnapshot, StateDescriptor stateDesc) { +=09=09Preconditions.checkState( +=09=09=09stateMetaInfoSnapshot !=3D null, +=09=09=09"Requested to check compatibility of a restored RegisteredKeyedBa= ckendStateMetaInfo," + +=09=09=09=09" but its corresponding restored snapshot cannot be found."); + +=09=09Preconditions.checkState(stateMetaInfoSnapshot.getBackendStateType() =09=09=09=09=3D=3D StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, =09=09=09"Incompatible state types. " + -=09=09=09=09"Was [" + restoredStateMetaInfoSnapshot.getBackendStateType() = + "], " + +=09=09=09=09"Was [" + stateMetaInfoSnapshot.getBackendStateType() + "], " = + =09=09=09=09"registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY= _VALUE + "]."); =20 =09=09Preconditions.checkState( -=09=09=09Objects.equals(newStateDescriptor.getName(), restoredStateMetaInf= oSnapshot.getName()), +=09=09=09Objects.equals(stateDesc.getName(), stateMetaInfoSnapshot.getName= ()), =09=09=09"Incompatible state names. " + -=09=09=09=09"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " + -=09=09=09=09"registered with [" + newStateDescriptor.getName() + "]."); +=09=09=09=09"Was [" + stateMetaInfoSnapshot.getName() + "], " + +=09=09=09=09"registered with [" + stateDesc.getName() + "]."); =20 =09=09final StateDescriptor.Type restoredType =3D =09=09=09StateDescriptor.Type.valueOf( -=09=09=09=09restoredStateMetaInfoSnapshot.getOption( +=09=09=09=09stateMetaInfoSnapshot.getOption( =09=09=09=09=09StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); =20 -=09=09if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Ty= pe.UNKNOWN) -=09=09=09&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { - +=09=09if (stateDesc.getType() !=3D StateDescriptor.Type.UNKNOWN && restore= dType !=3D StateDescriptor.Type.UNKNOWN) { =09=09=09Preconditions.checkState( -=09=09=09=09newStateDescriptor.getType() =3D=3D restoredType, +=09=09=09=09stateDesc.getType() =3D=3D restoredType, =09=09=09=09"Incompatible key/value state types. " + =09=09=09=09=09"Was [" + restoredType + "], " + -=09=09=09=09=09"registered with [" + newStateDescriptor.getType() + "]."); +=09=09=09=09=09"registered with [" + stateDesc.getType() + "]."); =09=09} - -=09=09// check compatibility results to determine if state migration is re= quired -=09=09CompatibilityResult namespaceCompatibility =3D CompatibilityUtil.= resolveCompatibilityResult( -=09=09=09restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfo= Snapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER), -=09=09=09null, -=09=09=09restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot( -=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZE= R), -=09=09=09newNamespaceSerializer); - -=09=09TypeSerializer newStateSerializer =3D newStateDescriptor.getSeria= lizer(); -=09=09CompatibilityResult stateCompatibility =3D CompatibilityUtil.reso= lveCompatibilityResult( -=09=09=09restoredStateMetaInfoSnapshot.restoreTypeSerializer( -=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER), -=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot( -=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER), -=09=09=09newStateSerializer); - -=09=09if (namespaceCompatibility.isRequiresMigration() || stateCompatibili= ty.isRequiresMigration()) { -=09=09=09// TODO state migration currently isn't possible. -=09=09=09throw StateMigrationException.notSupported(); -=09=09} else { -=09=09=09return new RegisteredKeyValueStateBackendMetaInfo<>( -=09=09=09=09newStateDescriptor.getType(), -=09=09=09=09newStateDescriptor.getName(), -=09=09=09=09newNamespaceSerializer, -=09=09=09=09newStateSerializer, -=09=09=09=09snapshotTransformer); -=09=09} -=09} - -=09@Nonnull -=09@Override -=09public StateMetaInfoSnapshot snapshot() { -=09=09return computeSnapshot(); =09} =20 =09@Nonnull diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hea= p/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink= /runtime/state/heap/HeapKeyedStateBackend.java index 837e51fafc0..18ac7fcc454 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapK= eyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapK= eyedStateBackend.java @@ -28,10 +28,9 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer= ; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibi= lity; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -206,14 +205,15 @@ public HeapKeyedStateBackend( =09=09=09StateMetaInfoSnapshot.CommonSerializerKeys serializerKey =3D =09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER; =20 -=09=09=09CompatibilityResult compatibilityResult =3D CompatibilityUtil.= resolveCompatibilityResult( -=09=09=09=09restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey), -=09=09=09=09null, -=09=09=09=09restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(seria= lizerKey), -=09=09=09=09byteOrderedElementSerializer); +=09=09=09@SuppressWarnings("unchecked") +=09=09=09TypeSerializerSnapshot serializerSnapshot =3D Preconditions.ch= eckNotNull( +=09=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTypeSe= rializerConfigSnapshot(serializerKey)); + +=09=09=09TypeSerializerSchemaCompatibility compatibilityResult =3D +=09=09=09=09serializerSnapshot.resolveSchemaCompatibility(byteOrderedEleme= ntSerializer); =20 -=09=09=09if (compatibilityResult.isRequiresMigration()) { -=09=09=09=09throw new FlinkRuntimeException(StateMigrationException.notSup= ported()); +=09=09=09if (compatibilityResult.isIncompatible()) { +=09=09=09=09throw new FlinkRuntimeException(new StateMigrationException("F= or heap backends, the new priority queue serializer must not be incompatibl= e.")); =09=09=09} else { =09=09=09=09registeredPQStates.put( =09=09=09=09=09stateName, @@ -257,7 +257,14 @@ public HeapKeyedStateBackend( =09=09@SuppressWarnings("unchecked") =09=09StateTable stateTable =3D (StateTable) registeredK= VStates.get(stateDesc.getName()); =20 -=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo; +=09=09TypeSerializer newStateSerializer =3D stateDesc.getSerializer(); +=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo =3D new Reg= isteredKeyValueStateBackendMetaInfo<>( +=09=09=09stateDesc.getType(), +=09=09=09stateDesc.getName(), +=09=09=09namespaceSerializer, +=09=09=09newStateSerializer, +=09=09=09snapshotTransformer); + =09=09if (stateTable !=3D null) { =09=09=09@SuppressWarnings("unchecked") =09=09=09StateMetaInfoSnapshot restoredMetaInfoSnapshot =3D @@ -269,21 +276,34 @@ public HeapKeyedStateBackend( =09=09=09=09"Requested to check compatibility of a restored RegisteredKeye= dBackendStateMetaInfo," + =09=09=09=09=09" but its corresponding restored snapshot cannot be found."= ); =20 -=09=09=09newMetaInfo =3D RegisteredKeyValueStateBackendMetaInfo.resolveKvS= tateCompatibility( -=09=09=09=09restoredMetaInfoSnapshot, -=09=09=09=09namespaceSerializer, -=09=09=09=09stateDesc, -=09=09=09=09snapshotTransformer); +=09=09=09@SuppressWarnings("unchecked") +=09=09=09TypeSerializerSnapshot namespaceSerializerSnapshot =3D Precond= itions.checkNotNull( +=09=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTypeSe= rializerConfigSnapshot( +=09=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIAL= IZER.toString())); + +=09=09=09TypeSerializerSchemaCompatibility namespaceCompatibility = =3D +=09=09=09=09namespaceSerializerSnapshot.resolveSchemaCompatibility(namespa= ceSerializer); + +=09=09=09if (namespaceCompatibility.isIncompatible()) { +=09=09=09=09throw new StateMigrationException("For heap backends, the new = namespace serializer cannot be incompatible."); +=09=09=09} + +=09=09=09RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restore= dMetaInfoSnapshot, stateDesc); + +=09=09=09@SuppressWarnings("unchecked") +=09=09=09TypeSerializerSnapshot stateSerializerSnapshot =3D Preconditio= ns.checkNotNull( +=09=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTypeSe= rializerConfigSnapshot( +=09=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER= .toString())); + +=09=09=09TypeSerializerSchemaCompatibility stateCompatibility =3D +=09=09=09=09stateSerializerSnapshot.resolveSchemaCompatibility(newStateSer= ializer); + +=09=09=09if (stateCompatibility.isIncompatible()) { +=09=09=09=09throw new StateMigrationException("For heap backends, the new = state serializer cannot be incompatible."); +=09=09=09} =20 =09=09=09stateTable.setMetaInfo(newMetaInfo); =09=09} else { -=09=09=09newMetaInfo =3D new RegisteredKeyValueStateBackendMetaInfo<>( -=09=09=09=09stateDesc.getType(), -=09=09=09=09stateDesc.getName(), -=09=09=09=09namespaceSerializer, -=09=09=09=09stateDesc.getSerializer(), -=09=09=09=09snapshotTransformer); - =09=09=09stateTable =3D snapshotStrategy.newStateTable(newMetaInfo); =09=09=09registeredKVStates.put(stateDesc.getName(), stateTable); =09=09} @@ -415,16 +435,9 @@ private void restorePartitionedState(Collection state) throws =09=09=09=09if (!keySerializerRestored) { =09=09=09=09=09// check for key serializer compatibility; this also reconf= igures the =09=09=09=09=09// key serializer to be compatible, if it is required and i= s possible -=09=09=09=09=09if (CompatibilityUtil.resolveCompatibilityResult( -=09=09=09=09=09=09=09serializationProxy.restoreKeySerializer(), -=09=09=09=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09=09=09=09serializationProxy.getKeySerializerConfigSnapshot(), -=09=09=09=09=09=09=09keySerializer) -=09=09=09=09=09=09.isRequiresMigration()) { - -=09=09=09=09=09=09// TODO replace with state migration; note that key hash= codes need to remain the same after migration -=09=09=09=09=09=09throw new StateMigrationException("The new key serialize= r is not compatible to read previous keys. " + -=09=09=09=09=09=09=09"Aborting now since state migration is currently not = available"); +=09=09=09=09=09if (!serializationProxy.getKeySerializerConfigSnapshot() +=09=09=09=09=09=09=09.resolveSchemaCompatibility(keySerializer).isCompatib= leAsIs()) { +=09=09=09=09=09=09throw new StateMigrationException("The new key serialize= r must be compatible."); =09=09=09=09=09} =20 =09=09=09=09=09keySerializerRestored =3D true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/Fil= eStateBackendMigrationTest.java b/flink-runtime/src/test/java/org/apache/fl= ink/runtime/state/FileStateBackendMigrationTest.java new file mode 100644 index 00000000000..3df294da969 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateB= ackendMigrationTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import java.io.File; + +/** + * Tests for the keyed state backend and operator state backend, as create= d by the + * {@link FsStateBackend}. + */ +public class FileStateBackendMigrationTest extends StateBackendMigrationTe= stBase { + +=09@Rule +=09public final TemporaryFolder tempFolder =3D new TemporaryFolder(); + +=09@Override +=09protected FsStateBackend getStateBackend() throws Exception { +=09=09File checkpointPath =3D tempFolder.newFolder(); +=09=09return new FsStateBackend(checkpointPath.toURI(), false); +=09} + +=09@Override +=09protected BackendSerializationTimeliness getStateBackendSerializationTi= meliness() { +=09=09return BackendSerializationTimeliness.ON_CHECKPOINTS; +=09} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/Fil= eStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runti= me/state/FileStateBackendTest.java index beea0c229c3..d34c784a1e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateB= ackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateB= ackendTest.java @@ -42,6 +42,11 @@ protected FsStateBackend getStateBackend() throws Except= ion { =09=09return new FsStateBackend(checkpointPath.toURI(), useAsyncMode()); =09} =20 +=09@Override +=09protected BackendSerializationTimeliness getStateBackendSerializationTi= meliness() throws Exception { +=09=09return BackendSerializationTimeliness.ON_CHECKPOINTS; +=09} + =09protected boolean useAsyncMode() { =09=09return false; =09} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/Mem= oryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/run= time/state/MemoryStateBackendTest.java index 215d7d36c96..d84f16f5897 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStat= eBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStat= eBackendTest.java @@ -56,6 +56,11 @@ protected MemoryStateBackend getStateBackend() throws Ex= ception { =09=09return new MemoryStateBackend(useAsyncMode()); =09} =20 +=09@Override +=09protected BackendSerializationTimeliness getStateBackendSerializationTi= meliness() throws Exception { +=09=09return BackendSerializationTimeliness.ON_CHECKPOINTS; +=09} + =09protected boolean useAsyncMode() { =09=09return false; =09} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/Sta= teBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/fl= ink/runtime/state/StateBackendMigrationTestBase.java new file mode 100644 index 00000000000..e4b957a4fcc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBacke= ndMigrationTestBase.java @@ -0,0 +1,780 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibi= lity; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.TestLogger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend= } as produced + * by various {@link StateBackend}s. + */ +@SuppressWarnings("serial") +public abstract class StateBackendMigrationTestBase extends TestLogger { + +=09@Rule +=09public final ExpectedException expectedException =3D ExpectedException.= none(); + +=09// lazily initialized stream storage +=09private CheckpointStorageLocation checkpointStorageLocation; + +=09/** +=09 * Different "personalities" of {@link CustomStringSerializer}. Instead= of creating +=09 * different classes we parameterize the serializer with this and +=09 * {@link CustomStringSerializerSnapshot} will instantiate serializers = with the correct +=09 * personality. +=09 */ +=09public enum SerializerVersion { +=09=09INITIAL, +=09=09RESTORE, +=09=09NEW +=09} + +=09/** +=09 * The compatibility behaviour of {@link CustomStringSerializer}. This = controls what +=09 * type of serializer {@link CustomStringSerializerSnapshot} will creat= e for +=09 * the different methods that return/create serializers. +=09 */ +=09public enum SerializerCompatibilityType { +=09=09COMPATIBLE_AS_IS, +=09=09REQUIRES_MIGRATION +=09} + +=09/** +=09 * The serialization timeliness behaviour of the state backend under te= st. +=09 */ +=09public enum BackendSerializationTimeliness { +=09=09ON_ACCESS, +=09=09ON_CHECKPOINTS +=09} + +=09@Test +=09@SuppressWarnings("unchecked") +=09public void testValueStateWithSerializerRequiringMigration() throws Exc= eption { +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09CheckpointStreamFactory streamFactory =3D createStreamFactory(); +=09=09SharedStateRegistry sharedStateRegistry =3D new SharedStateRegistry(= ); +=09=09AbstractKeyedStateBackend backend =3D createKeyedBackend(In= tSerializer.INSTANCE); + +=09=09ValueStateDescriptor kvId =3D new ValueStateDescriptor<>( +=09=09=09"id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE= _AS_IS, SerializerVersion.INITIAL)); +=09=09ValueState state =3D backend.getPartitionedState(VoidNamespa= ce.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Modifications to the state = =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should result in serializer personality INITIAL having 2 = serialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should not result in any serialize / deserialize calls + +=09=09backend.setCurrentKey(1); +=09=09state.update("1"); +=09=09backend.setCurrentKey(2); +=09=09state.update("2"); +=09=09backend.setCurrentKey(1); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Snapshot #1 =3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality INITIAL having 2 = serialize calls +=09=09KeyedStateHandle snapshot1 =3D runSnapshot( +=09=09=09backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forChec= kpointWithDefaultLocation()), +=09=09=09sharedStateRegistry); +=09=09backend.dispose(); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Restore from snapshot #1 =3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality RESTORE having 2 = deserialize calls +=09=09backend =3D restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.RESTORE)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09ValueStateDescriptor newKvId =3D new ValueStateDescriptor<>(= "id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.REQUIRES_M= IGRATION, SerializerVersion.NEW)); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D State registration that trig= gers state migration =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should result in serializer personality RESTORE having 2 = deserialize calls, and NEW having 2 serialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09ValueState restored1 =3D backend.getPartitionedState(VoidNam= espace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.RESTORE)); + +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.RESTORE)); + +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D More modifications to the st= ate =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should result in serializer personality NEW having 2 seri= alize calls and 3 deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09backend.setCurrentKey(1); +=09=09assertEquals("1", restored1.value()); +=09=09restored1.update("1"); // s, NEW +=09=09backend.setCurrentKey(2); +=09=09assertEquals("2", restored1.value()); +=09=09restored1.update("3"); // s, NEW +=09=09assertEquals("3", restored1.value()); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.NEW)); +=09=09=09assertEquals((Integer) 3, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.NEW)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Snapshot #2 =3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality NEW having 2 seri= alize calls +=09=09KeyedStateHandle snapshot2 =3D runSnapshot( +=09=09=09backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forChec= kpointWithDefaultLocation()), +=09=09=09sharedStateRegistry); +=09=09backend.dispose(); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} else { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// and restore once with NEW from NEW so that we see a read using th= e NEW serializer +=09=09// on the file backend +=09=09ValueStateDescriptor newKvId2 =3D new ValueStateDescriptor<>= ( +=09=09=09"id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE= _AS_IS, SerializerVersion.NEW)); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Restore from snapshot #2 =3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality RESTORE having 2 = deserialize calls +=09=09backend =3D restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.RESTORE)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.RESTORE)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamesp= aceSerializer.INSTANCE, newKvId2); +=09=09snapshot2.discardState(); +=09=09snapshot1.discardState(); + +=09=09backend.dispose(); +=09} + +=09@Test +=09@SuppressWarnings("unchecked") +=09public void testValueStateWithNewSerializer() throws Exception { +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09CheckpointStreamFactory streamFactory =3D createStreamFactory(); +=09=09SharedStateRegistry sharedStateRegistry =3D new SharedStateRegistry(= ); +=09=09AbstractKeyedStateBackend backend =3D createKeyedBackend(In= tSerializer.INSTANCE); + +=09=09ValueStateDescriptor kvId =3D new ValueStateDescriptor<>( +=09=09=09"id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE= _AS_IS, SerializerVersion.INITIAL)); +=09=09ValueState state =3D backend.getPartitionedState(VoidNamespa= ce.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Modifications to the state = =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should result in serializer personality INITIAL having 2 = serialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should not result in any serialize / deserialize calls + +=09=09backend.setCurrentKey(1); +=09=09state.update("1"); +=09=09backend.setCurrentKey(2); +=09=09state.update("2"); +=09=09backend.setCurrentKey(1); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Snapshot #1 =3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality INITIAL having 2 = serialize calls +=09=09KeyedStateHandle snapshot1 =3D runSnapshot( +=09=09=09backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forChec= kpointWithDefaultLocation()), +=09=09=09sharedStateRegistry); +=09=09backend.dispose(); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Restore from snapshot #1 =3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality RESTORE having 2 = deserialize calls +=09=09backend =3D restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.INITIAL)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.INITIAL)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.RESTORE)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09ValueStateDescriptor newKvId =3D new ValueStateDescriptor<>(= "id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE= _AS_IS, SerializerVersion.NEW)); +=09=09ValueState restored1 =3D backend.getPartitionedState(VoidNam= espace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, newKvId); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D More modifications to the st= ate =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should result in serializer personality NEW having 2 seri= alize calls and 3 deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should not result in any serialize / deserialize calls + +=09=09backend.setCurrentKey(1); +=09=09assertEquals("1", restored1.value()); +=09=09restored1.update("1"); +=09=09backend.setCurrentKey(2); +=09=09assertEquals("2", restored1.value()); +=09=09restored1.update("3"); +=09=09assertEquals("3", restored1.value()); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.NEW)); +=09=09=09assertEquals((Integer) 3, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.NEW)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Snapshot #2 =3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality NEW having 2 seri= alize calls +=09=09KeyedStateHandle snapshot2 =3D runSnapshot( +=09=09=09backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forChec= kpointWithDefaultLocation()), +=09=09=09sharedStateRegistry); +=09=09snapshot1.discardState(); +=09=09backend.dispose(); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} else { +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.serializeCalled.= get(SerializerVersion.NEW)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.NEW)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09// =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Restore from snapshot #2 =3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D +=09=09// For eager serialization backends: +=09=09// This should not result in any serialize / deserialize calls +=09=09// +=09=09// For lazy serialization backends: +=09=09// This should result in serializer personality RESTORE having 2 = deserialize calls +=09=09ValueStateDescriptor newKvId2 =3D new ValueStateDescriptor<>= ("id", +=09=09=09new CustomStringSerializer(SerializerCompatibilityType.COMPATIBLE= _AS_IS, SerializerVersion.NEW)); +=09=09backend =3D restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2); + +=09=09if (getStateBackendSerializationTimeliness() =3D=3D BackendSerializa= tionTimeliness.ON_ACCESS) { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals(null, CustomStringSerializer.deserializeCalled.get(S= erializerVersion.RESTORE)); +=09=09} else { +=09=09=09assertEquals(null, CustomStringSerializer.serializeCalled.get(Ser= ializerVersion.RESTORE)); +=09=09=09assertEquals((Integer) 2, CustomStringSerializer.deserializeCalle= d.get(SerializerVersion.RESTORE)); +=09=09} +=09=09CustomStringSerializer.resetCountingMaps(); + +=09=09backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamesp= aceSerializer.INSTANCE, newKvId2); +=09=09snapshot2.discardState(); +=09=09backend.dispose(); +=09} + +=09public static class CustomStringSerializer extends TypeSerializer { + +=09=09private static final long serialVersionUID =3D 1L; + +=09=09private static final String EMPTY =3D ""; + +=09=09private SerializerCompatibilityType compatibilityType; +=09=09private SerializerVersion serializerVersion; + +=09=09private static final String CONFIG_PAYLOAD =3D "configPayload"; + +=09=09// for counting how often the methods were called from serializers o= f the different personalities +=09=09public static Map serializeCalled =3D ne= w HashMap<>(); +=09=09public static Map deserializeCalled =3D = new HashMap<>(); + +=09=09static void resetCountingMaps() { +=09=09=09serializeCalled =3D new HashMap<>(); +=09=09=09deserializeCalled =3D new HashMap<>(); +=09=09} + +=09=09CustomStringSerializer( +=09=09=09SerializerCompatibilityType compatibilityType, +=09=09=09SerializerVersion serializerVersion) { +=09=09=09this.compatibilityType =3D compatibilityType; +=09=09=09this.serializerVersion =3D serializerVersion; +=09=09} + +=09=09@Override +=09=09public boolean isImmutableType() { +=09=09=09return true; +=09=09} + +=09=09@Override +=09=09public String createInstance() { +=09=09=09return EMPTY; +=09=09} + +=09=09@Override +=09=09public String copy(String from) { +=09=09=09return from; +=09=09} + +=09=09@Override +=09=09public String copy(String from, String reuse) { +=09=09=09return from; +=09=09} + +=09=09@Override +=09=09public int getLength() { +=09=09=09return -1; +=09=09} + +=09=09@Override +=09=09public void serialize(String record, DataOutputView target) throws I= OException { +=09=09=09serializeCalled.compute(serializerVersion, (k, v) -> v =3D=3D nul= l ? 1 : v + 1); +=09=09=09StringValue.writeString(record, target); +=09=09} + +=09=09@Override +=09=09public String deserialize(DataInputView source) throws IOException { +=09=09=09deserializeCalled.compute(serializerVersion, (k, v) -> v =3D=3D n= ull ? 1 : v + 1); +=09=09=09return StringValue.readString(source); +=09=09} + +=09=09@Override +=09=09public String deserialize(String record, DataInputView source) throw= s IOException { +=09=09=09deserializeCalled.compute(serializerVersion, (k, v) -> v =3D=3D n= ull ? 1 : v + 1); +=09=09=09return deserialize(source); +=09=09} + +=09=09@Override +=09=09public void copy(DataInputView source, DataOutputView target) throws= IOException { +=09=09=09StringValue.copyString(source, target); +=09=09} + +=09=09@Override +=09=09public boolean canEqual(Object obj) { +=09=09=09return obj instanceof CustomStringSerializer; +=09=09} + +=09=09@Override +=09=09public TypeSerializer duplicate() { +=09=09=09return this; +=09=09} + +=09=09@Override +=09=09public boolean equals(Object obj) { +=09=09=09return obj instanceof CustomStringSerializer; +=09=09} + +=09=09@Override +=09=09public int hashCode() { +=09=09=09return getClass().hashCode(); +=09=09} + +=09=09@Override +=09=09public TypeSerializerSnapshot snapshotConfiguration() { +=09=09=09return new CustomStringSerializerSnapshot(CONFIG_PAYLOAD); +=09=09} + +=09=09SerializerCompatibilityType getCompatibilityType() { +=09=09=09return compatibilityType; +=09=09} +=09} + +=09public static class CustomStringSerializerSnapshot implements TypeSeria= lizerSnapshot { + +=09=09private String configPayload; + +=09=09public CustomStringSerializerSnapshot() {} + +=09=09public CustomStringSerializerSnapshot(String configPayload) { +=09=09=09this.configPayload =3D configPayload; +=09=09} + +=09=09@Override +=09=09public void write(DataOutputView out) throws IOException { +=09=09=09out.writeUTF(configPayload); +=09=09} + +=09=09@Override +=09=09public void read(int readVersion, DataInputView in, ClassLoader clas= sLoader) throws IOException { +=09=09=09configPayload =3D in.readUTF(); +=09=09} + +=09=09@Override +=09=09public TypeSerializer restoreSerializer() { +=09=09=09return new CustomStringSerializer(SerializerCompatibilityType.COM= PATIBLE_AS_IS, SerializerVersion.RESTORE); + +=09=09} + +=09=09@Override +=09=09public > TypeSerializerSchemaCompa= tibility resolveSchemaCompatibility(NS newSerializer) { +=09=09=09if (newSerializer instanceof CustomStringSerializer) { +=09=09=09=09SerializerCompatibilityType compatibilityType =3D ((CustomStri= ngSerializer) newSerializer).getCompatibilityType(); + +=09=09=09=09if (compatibilityType =3D=3D SerializerCompatibilityType.COMPA= TIBLE_AS_IS) { +=09=09=09=09=09return TypeSerializerSchemaCompatibility.compatibleAsIs(); +=09=09=09=09} else { +=09=09=09=09=09return TypeSerializerSchemaCompatibility.compatibleAfterMig= ration(); +=09=09=09=09} +=09=09=09} else { +=09=09=09=09return TypeSerializerSchemaCompatibility.incompatible(); +=09=09=09} +=09=09} + +=09=09@Override +=09=09public boolean equals(Object obj) { +=09=09=09return obj instanceof CustomStringSerializerSnapshot; +=09=09} + +=09=09@Override +=09=09public int hashCode() { +=09=09=09return 0; +=09=09} + +=09=09@Override +=09=09public int getCurrentVersion() { +=09=09=09return 0; +=09=09} +=09} + +=09public static class CustomVoidNamespaceSerializer extends TypeSerialize= r { + +=09=09private static final long serialVersionUID =3D 1L; + +=09=09public static final CustomVoidNamespaceSerializer INSTANCE =3D new C= ustomVoidNamespaceSerializer(); + +=09=09@Override +=09=09public boolean isImmutableType() { +=09=09=09return true; +=09=09} + +=09=09@Override +=09=09public VoidNamespace createInstance() { +=09=09=09return VoidNamespace.get(); +=09=09} + +=09=09@Override +=09=09public VoidNamespace copy(VoidNamespace from) { +=09=09=09return VoidNamespace.get(); +=09=09} + +=09=09@Override +=09=09public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) { +=09=09=09return VoidNamespace.get(); +=09=09} + +=09=09@Override +=09=09public int getLength() { +=09=09=09return 0; +=09=09} + +=09=09@Override +=09=09public void serialize(VoidNamespace record, DataOutputView target) t= hrows IOException { +=09=09=09// Make progress in the stream, write one byte. +=09=09=09// +=09=09=09// We could just skip writing anything here, because of the way t= his is +=09=09=09// used with the state backends, but if it is ever used somewhere= else +=09=09=09// (even though it is unlikely to happen), it would be a problem. +=09=09=09target.write(0); +=09=09} + +=09=09@Override +=09=09public VoidNamespace deserialize(DataInputView source) throws IOExce= ption { +=09=09=09source.readByte(); +=09=09=09return VoidNamespace.get(); +=09=09} + +=09=09@Override +=09=09public VoidNamespace deserialize(VoidNamespace reuse, DataInputView = source) throws IOException { +=09=09=09source.readByte(); +=09=09=09return VoidNamespace.get(); +=09=09} + +=09=09@Override +=09=09public void copy(DataInputView source, DataOutputView target) throws= IOException { +=09=09=09target.write(source.readByte()); +=09=09} + +=09=09@Override +=09=09public TypeSerializer duplicate() { +=09=09=09return this; +=09=09} + +=09=09@Override +=09=09public boolean canEqual(Object obj) { +=09=09=09return obj instanceof CustomVoidNamespaceSerializer; +=09=09} + +=09=09@Override +=09=09public boolean equals(Object obj) { +=09=09=09return obj instanceof CustomVoidNamespaceSerializer; +=09=09} + +=09=09@Override +=09=09public int hashCode() { +=09=09=09return getClass().hashCode(); +=09=09} + +=09=09@Override +=09=09public TypeSerializerSnapshot snapshotConfiguration()= { +=09=09=09return new CustomVoidNamespaceSerializerSnapshot(); +=09=09} +=09} + +=09public static class CustomVoidNamespaceSerializerSnapshot implements Ty= peSerializerSnapshot { + +=09=09@Override +=09=09public TypeSerializer restoreSerializer() { +=09=09=09return new CustomVoidNamespaceSerializer(); +=09=09} + +=09=09@Override +=09=09public > TypeSerializerSche= maCompatibility resolveSchemaCompatibility(NS newSeriali= zer) { +=09=09=09return TypeSerializerSchemaCompatibility.compatibleAsIs(); +=09=09} + +=09=09@Override +=09=09public void write(DataOutputView out) throws IOException {} + +=09=09@Override +=09=09public void read(int readVersion, DataInputView in, ClassLoader user= CodeClassLoader) throws IOException {} + +=09=09@Override +=09=09public boolean equals(Object obj) { +=09=09=09return obj instanceof CustomVoidNamespaceSerializerSnapshot; +=09=09} + +=09=09@Override +=09=09public int hashCode() { +=09=09=09return 0; +=09=09} + +=09=09@Override +=09=09public int getCurrentVersion() { +=09=09=09return 0; +=09=09} +=09} + +=09protected abstract B getStateBackend() throws Exception; + +=09protected abstract BackendSerializationTimeliness getStateBackendSerial= izationTimeliness(); + +=09private CheckpointStreamFactory createStreamFactory() throws Exception = { +=09=09if (checkpointStorageLocation =3D=3D null) { +=09=09=09checkpointStorageLocation =3D getStateBackend() +=09=09=09=09.createCheckpointStorage(new JobID()) +=09=09=09=09.initializeLocationForCheckpoint(1L); +=09=09} +=09=09return checkpointStorageLocation; +=09} + +=09private AbstractKeyedStateBackend createKeyedBackend(TypeSeriali= zer keySerializer) throws Exception { +=09=09return createKeyedBackend(keySerializer, new DummyEnvironment()); +=09} + +=09private AbstractKeyedStateBackend createKeyedBackend(TypeSerial= izer keySerializer, Environment env) throws Exception { +=09=09return createKeyedBackend( +=09=09=09keySerializer, +=09=09=0910, +=09=09=09new KeyGroupRange(0, 9), +=09=09=09env); +=09} + +=09private AbstractKeyedStateBackend createKeyedBackend( +=09=09TypeSerializer keySerializer, +=09=09int numberOfKeyGroups, +=09=09KeyGroupRange keyGroupRange, +=09=09Environment env) throws Exception { +=09=09AbstractKeyedStateBackend backend =3D getStateBackend().createKey= edStateBackend( +=09=09=09env, +=09=09=09new JobID(), +=09=09=09"test_op", +=09=09=09keySerializer, +=09=09=09numberOfKeyGroups, +=09=09=09keyGroupRange, +=09=09=09env.getTaskKvStateRegistry()); +=09=09backend.restore(null); +=09=09return backend; +=09} + +=09private AbstractKeyedStateBackend restoreKeyedBackend(TypeSeria= lizer keySerializer, KeyedStateHandle state) throws Exception { +=09=09return restoreKeyedBackend(keySerializer, state, new DummyEnvironmen= t()); +=09} + +=09private AbstractKeyedStateBackend restoreKeyedBackend( +=09=09TypeSerializer keySerializer, +=09=09KeyedStateHandle state, +=09=09Environment env) throws Exception { +=09=09return restoreKeyedBackend( +=09=09=09keySerializer, +=09=09=0910, +=09=09=09new KeyGroupRange(0, 9), +=09=09=09Collections.singletonList(state), +=09=09=09env); +=09} + +=09private AbstractKeyedStateBackend restoreKeyedBackend( +=09=09TypeSerializer keySerializer, +=09=09int numberOfKeyGroups, +=09=09KeyGroupRange keyGroupRange, +=09=09List state, +=09=09Environment env) throws Exception { +=09=09AbstractKeyedStateBackend backend =3D getStateBackend().createKey= edStateBackend( +=09=09=09env, +=09=09=09new JobID(), +=09=09=09"test_op", +=09=09=09keySerializer, +=09=09=09numberOfKeyGroups, +=09=09=09keyGroupRange, +=09=09=09env.getTaskKvStateRegistry()); +=09=09backend.restore(new StateObjectCollection<>(state)); +=09=09return backend; +=09} + +=09private KeyedStateHandle runSnapshot( +=09=09RunnableFuture> snapshotRunnableFut= ure, +=09=09SharedStateRegistry sharedStateRegistry) throws Exception { + +=09=09if (!snapshotRunnableFuture.isDone()) { +=09=09=09snapshotRunnableFuture.run(); +=09=09} + +=09=09SnapshotResult snapshotResult =3D snapshotRunnable= Future.get(); +=09=09KeyedStateHandle jobManagerOwnedSnapshot =3D snapshotResult.getJobMa= nagerOwnedSnapshot(); +=09=09if (jobManagerOwnedSnapshot !=3D null) { +=09=09=09jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry)= ; +=09=09} +=09=09return jobManagerOwnedSnapshot; +=09} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/Sta= teBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runti= me/state/StateBackendTestBase.java index 0712b641678..41a6ab88247 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBacke= ndTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBacke= ndTestBase.java @@ -131,6 +131,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -147,11 +148,21 @@ =09@Rule =09public final ExpectedException expectedException =3D ExpectedException.= none(); =20 +=09/** +=09 * The serialization timeliness behaviour of the state backend under te= st. +=09 */ +=09public enum BackendSerializationTimeliness { +=09=09ON_ACCESS, +=09=09ON_CHECKPOINTS +=09} + =09// lazily initialized stream storage =09private CheckpointStorageLocation checkpointStorageLocation; =20 =09protected abstract B getStateBackend() throws Exception; =20 +=09protected abstract BackendSerializationTimeliness getStateBackendSerial= izationTimeliness() throws Exception; + =09protected abstract boolean isSerializerPresenceRequiredOnRestore(); =20 =09protected CheckpointStreamFactory createStreamFactory() throws Exceptio= n { @@ -803,6 +814,9 @@ public void testKryoRegisteringRestoreResilienceWithReg= isteredSerializer() throw =09@Test =09@SuppressWarnings("unchecked") =09public void testKryoRestoreResilienceWithDifferentRegistrationOrder() t= hrows Exception { + +=09=09assumeThat(getStateBackendSerializationTimeliness(), is(BackendSeria= lizationTimeliness.ON_ACCESS)); + =09=09CheckpointStreamFactory streamFactory =3D createStreamFactory(); =09=09Environment env =3D new DummyEnvironment(); =09=09SharedStateRegistry sharedStateRegistry =3D new SharedStateRegistry(= ); @@ -969,6 +983,9 @@ public void testPojoRestoreResilienceWithDifferentRegis= trationOrder() throws Exc =20 =09@Test =09public void testStateSerializerReconfiguration() throws Exception { + +=09=09assumeThat(getStateBackendSerializationTimeliness(), is(BackendSeria= lizationTimeliness.ON_ACCESS)); + =09=09CheckpointStreamFactory streamFactory =3D createStreamFactory(); =09=09SharedStateRegistry sharedStateRegistry =3D new SharedStateRegistry(= ); =09=09Environment env =3D new DummyEnvironment(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl= /TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runti= me/state/ttl/TtlStateTestBase.java index f9f108a3a12..54eedb229c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSta= teTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSta= teTestBase.java @@ -387,6 +387,8 @@ public void testSnapshotChangeRestore() throws Exceptio= n { =09@Test(expected =3D StateMigrationException.class) =09public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws = Exception { =09=09assumeThat(this, not(instanceOf(MockTtlStateTest.class))); +=09=09assumeThat(this, not(instanceOf(HeapAsyncSnapshotTtlStateTest.class)= )); +=09=09assumeThat(this, not(instanceOf(HeapSyncSnapshotTtlStateTest.class))= ); =20 =09=09initTest(); =20 diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/= org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-= state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/co= ntrib/streaming/state/AbstractRocksDBState.java index 8b8fbb23a99..2218bc0d3e0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StateMigrationException; =20 import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -154,6 +155,20 @@ public void setCurrentNamespace(N namespace) { =09=09return backend.db.get(columnFamily, tmpKeySerializationView.getCopyO= fBuffer()); =09} =20 +=09public void migrateSerializedValue( +=09=09=09DataInputDeserializer serializedOldValueInput, +=09=09=09DataOutputSerializer serializedMigratedValueOutput, +=09=09=09TypeSerializer priorSerializer, +=09=09=09TypeSerializer newSerializer) throws StateMigrationException { + +=09=09try { +=09=09=09V value =3D priorSerializer.deserialize(serializedOldValueInput); +=09=09=09newSerializer.serialize(value, serializedMigratedValueOutput); +=09=09} catch (Exception e) { +=09=09=09throw new StateMigrationException("Error while trying to migratio= n RocksDB state.", e); +=09=09} +=09} + =09byte[] getKeyBytes() { =09=09try { =09=09=09writeCurrentKeyWithGroupAndNamespace(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/= org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/fl= ink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flin= k/contrib/streaming/state/RocksDBKeyedStateBackend.java index 3d66db6e6fc..77fef509c38 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -27,10 +27,9 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer= ; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibi= lity; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArray= Serializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -95,6 +94,7 @@ import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -694,16 +694,9 @@ private void restoreKVStateMetaData() throws IOExcepti= on, StateMigrationExceptio =20 =09=09=09// check for key serializer compatibility; this also reconfigures= the =09=09=09// key serializer to be compatible, if it is required and is poss= ible -=09=09=09if (CompatibilityUtil.resolveCompatibilityResult( -=09=09=09=09serializationProxy.restoreKeySerializer(), -=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09serializationProxy.getKeySerializerConfigSnapshot(), -=09=09=09=09rocksDBKeyedStateBackend.keySerializer) -=09=09=09=09.isRequiresMigration()) { - -=09=09=09=09// TODO replace with state migration; note that key hash codes= need to remain the same after migration -=09=09=09=09throw new StateMigrationException("The new key serializer is n= ot compatible to read previous keys. " + -=09=09=09=09=09"Aborting now since state migration is currently not availa= ble"); +=09=09=09if (!serializationProxy.getKeySerializerConfigSnapshot() +=09=09=09=09=09.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySer= ializer).isCompatibleAsIs()) { +=09=09=09=09throw new StateMigrationException("The new key serializer must= be compatible."); =09=09=09} =20 =09=09=09this.keygroupStreamCompressionDecorator =3D serializationProxy.is= UsingKeyGroupCompression() ? @@ -1245,16 +1238,9 @@ private void restoreInstanceDirectoryFromPath(Path s= ource) throws IOException { =20 =09=09=09=09// check for key serializer compatibility; this also reconfigu= res the =09=09=09=09// key serializer to be compatible, if it is required and is p= ossible -=09=09=09=09if (CompatibilityUtil.resolveCompatibilityResult( -=09=09=09=09=09serializationProxy.restoreKeySerializer(), -=09=09=09=09=09UnloadableDummyTypeSerializer.class, -=09=09=09=09=09serializationProxy.getKeySerializerConfigSnapshot(), -=09=09=09=09=09stateBackend.keySerializer) -=09=09=09=09=09.isRequiresMigration()) { - -=09=09=09=09=09// TODO replace with state migration; note that key hash co= des need to remain the same after migration -=09=09=09=09=09throw new StateMigrationException("The new key serializer i= s not compatible to read previous keys. " + -=09=09=09=09=09=09"Aborting now since state migration is currently not ava= ilable"); +=09=09=09=09if (!serializationProxy.getKeySerializerConfigSnapshot() +=09=09=09=09=09=09.resolveSchemaCompatibility(stateBackend.keySerializer).= isCompatibleAsIs()) { +=09=09=09=09=09throw new StateMigrationException("The new key serializer m= ust be compatible."); =09=09=09=09} =20 =09=09=09=09return serializationProxy.getStateMetaInfoSnapshots(); @@ -1345,42 +1331,32 @@ private void copyStateDataHandleData( =09 * already have a registered entry for that and return it (after some n= ecessary state compatibility checks) =09 * or create a new one if it does not exist. =09 */ -=09private Tuple2> tryRegisterKvStateInformation( -=09=09=09StateDescriptor stateDesc, +=09private Tuple2> tryRegisterKvStateInformation( +=09=09=09StateDescriptor stateDesc, =09=09=09TypeSerializer namespaceSerializer, -=09=09=09@Nullable StateSnapshotTransformer snapshotTransformer) throws= StateMigrationException { +=09=09=09@Nullable StateSnapshotTransformer snapshotTransformer) throw= s Exception { =20 =09=09Tuple2 stateInfo = =3D =09=09=09kvStateInformation.get(stateDesc.getName()); =20 -=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo; -=09=09if (stateInfo !=3D null) { - -=09=09=09StateMetaInfoSnapshot restoredMetaInfoSnapshot =3D restoredKvStat= eMetaInfos.get(stateDesc.getName()); +=09=09TypeSerializer stateSerializer =3D stateDesc.getSerializer(); +=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo =3D new Re= gisteredKeyValueStateBackendMetaInfo<>( +=09=09=09stateDesc.getType(), +=09=09=09stateDesc.getName(), +=09=09=09namespaceSerializer, +=09=09=09stateSerializer, +=09=09=09snapshotTransformer); =20 -=09=09=09Preconditions.checkState( -=09=09=09=09restoredMetaInfoSnapshot !=3D null, -=09=09=09=09"Requested to check compatibility of a restored RegisteredKeye= dBackendStateMetaInfo," + -=09=09=09=09=09" but its corresponding restored snapshot cannot be found."= ); - -=09=09=09newMetaInfo =3D RegisteredKeyValueStateBackendMetaInfo.resolveKvS= tateCompatibility( -=09=09=09=09restoredMetaInfoSnapshot, -=09=09=09=09namespaceSerializer, +=09=09if (stateInfo !=3D null) { +=09=09=09migrateStateIfNecessary( +=09=09=09=09newMetaInfo, =09=09=09=09stateDesc, -=09=09=09=09snapshotTransformer); +=09=09=09=09namespaceSerializer, +=09=09=09=09stateInfo); =20 =09=09=09stateInfo.f1 =3D newMetaInfo; =09=09} else { -=09=09=09String stateName =3D stateDesc.getName(); - -=09=09=09newMetaInfo =3D new RegisteredKeyValueStateBackendMetaInfo<>( -=09=09=09=09stateDesc.getType(), -=09=09=09=09stateName, -=09=09=09=09namespaceSerializer, -=09=09=09=09stateDesc.getSerializer(), -=09=09=09=09snapshotTransformer); - -=09=09=09ColumnFamilyHandle columnFamily =3D createColumnFamily(stateName)= ; +=09=09=09ColumnFamilyHandle columnFamily =3D createColumnFamily(stateDesc.= getName()); =20 =09=09=09stateInfo =3D Tuple2.of(columnFamily, newMetaInfo); =09=09=09kvStateInformation.put(stateDesc.getName(), stateInfo); @@ -1389,6 +1365,119 @@ private void copyStateDataHandleData( =09=09return Tuple2.of(stateInfo.f0, newMetaInfo); =09} =20 +=09private void migrateStateIfNecessary( +=09=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo, +=09=09=09StateDescriptor stateDesc, +=09=09=09TypeSerializer namespaceSerializer, +=09=09=09Tuple2 stateInfo= ) throws Exception { + +=09=09StateMetaInfoSnapshot restoredMetaInfoSnapshot =3D restoredKvStateMe= taInfos.get(stateDesc.getName()); + +=09=09RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMe= taInfoSnapshot, stateDesc); + +=09=09@SuppressWarnings("unchecked") +=09=09TypeSerializerSnapshot namespaceSerializerSnapshot =3D Preconditi= ons.checkNotNull( +=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTypeSeria= lizerConfigSnapshot( +=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZE= R.toString())); + +=09=09TypeSerializerSchemaCompatibility namespaceCompatibility =3D +=09=09=09namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceS= erializer); +=09=09if (!namespaceCompatibility.isCompatibleAsIs()) { +=09=09=09throw new StateMigrationException("The new namespace serializer m= ust be compatible."); +=09=09} + +=09=09@SuppressWarnings("unchecked") +=09=09TypeSerializerSnapshot stateSerializerSnapshot =3D Preconditions= .checkNotNull( +=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTypeSeri= alizerConfigSnapshot( +=09=09=09=09StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.to= String())); + +=09=09TypeSerializer newStateSerializer =3D stateDesc.getSerializer(); +=09=09TypeSerializerSchemaCompatibility stateCompatibility =3D +=09=09=09stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerial= izer); + +=09=09if (stateCompatibility.isCompatibleAfterMigration()) { +=09=09=09migrateStateValues(stateDesc, stateInfo, restoredMetaInfoSnapshot= , newMetaInfo); +=09=09} else if (stateCompatibility.isIncompatible()) { +=09=09=09throw new StateMigrationException("The new state serializer is in= compatible with previous state."); +=09=09} +=09} + +=09/** +=09 * Migrate only the state value, that is the "value" that is stored in = RocksDB. We don't migrate +=09 * the key here, which is made up of key group, key, namespace and map = key +=09 * (in case of MapState). +=09 */ +=09private void migrateStateValues( +=09=09StateDescriptor stateDesc, +=09=09Tuple2 stateInfo, +=09=09StateMetaInfoSnapshot restoredMetaInfoSnapshot, +=09=09RegisteredKeyValueStateBackendMetaInfo newMetaInfo) throws Ex= ception { + +=09=09if (stateDesc.getType() =3D=3D StateDescriptor.Type.MAP) { +=09=09=09throw new StateMigrationException("The new serializer for a MapSt= ate requires state migration in order for the job to proceed." + +=09=09=09=09" However, migration for MapState currently isn't supported.")= ; +=09=09} + +=09=09LOG.info( +=09=09=09"Performing state migration for state {} because the state serial= izer's schema, i.e. serialization format, has changed.", +=09=09=09stateDesc); + +=09=09// we need to get an actual state instance because migration is diff= erent +=09=09// for different state types. For example, ListState needs to deal w= ith +=09=09// individual elements +=09=09StateFactory stateFactory =3D STATE_FACTORIES.get(stateDesc.getClass= ()); +=09=09if (stateFactory =3D=3D null) { +=09=09=09String message =3D String.format("State %s is not supported by %s= ", +=09=09=09=09stateDesc.getClass(), this.getClass()); +=09=09=09throw new FlinkRuntimeException(message); +=09=09} +=09=09State state =3D stateFactory.createState( +=09=09=09stateDesc, +=09=09=09Tuple2.of(stateInfo.f0, newMetaInfo), +=09=09=09RocksDBKeyedStateBackend.this); +=09=09if (!(state instanceof AbstractRocksDBState)) { +=09=09=09throw new FlinkRuntimeException( +=09=09=09=09"State should be an AbstractRocksDBState but is " + state); +=09=09} + +=09=09@SuppressWarnings("unchecked") +=09=09AbstractRocksDBState rocksDBState =3D (AbstractRocksDBS= tate) state; + +=09=09Snapshot rocksDBSnapshot =3D db.getSnapshot(); +=09=09try ( +=09=09=09RocksIteratorWrapper iterator =3D getRocksIterator(db, stateInfo.= f0); +=09=09=09RocksDBWriteBatchWrapper batchWriter =3D new RocksDBWriteBatchWra= pper(db, getWriteOptions()) +=09=09) { +=09=09=09iterator.seekToFirst(); + +=09=09=09@SuppressWarnings("unchecked") +=09=09=09TypeSerializerSnapshot priorValueSerializerSnapshot =3D (Type= SerializerSnapshot) +=09=09=09=09Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSer= ializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERI= ALIZER)); +=09=09=09TypeSerializer priorValueSerializer =3D priorValueSerializerS= napshot.restoreSerializer(); + +=09=09=09DataInputDeserializer serializedValueInput =3D new DataInputDeser= ializer(); +=09=09=09DataOutputSerializer migratedSerializedValueOutput =3D new DataOu= tputSerializer(1); +=09=09=09while (iterator.isValid()) { +=09=09=09=09serializedValueInput.setBuffer(iterator.value()); + +=09=09=09=09rocksDBState.migrateSerializedValue( +=09=09=09=09=09serializedValueInput, +=09=09=09=09=09migratedSerializedValueOutput, +=09=09=09=09=09priorValueSerializer, +=09=09=09=09=09stateDesc.getSerializer()); + +=09=09=09=09batchWriter.put(stateInfo.f0, iterator.key(), migratedSerializ= edValueOutput.getCopyOfBuffer()); + +=09=09=09=09migratedSerializedValueOutput.clear(); +=09=09=09=09iterator.next(); +=09=09=09} +=09=09=09batchWriter.flush(); +=09=09} finally { +=09=09=09db.releaseSnapshot(rocksDBSnapshot); +=09=09=09rocksDBSnapshot.close(); +=09=09} +=09} + =09/** =09 * Creates a column family handle for use with a k/v state. =09 */ @@ -1585,13 +1674,14 @@ public static RocksIteratorWrapper getRocksIterator= ( =09=09=09TypeSerializer metaInfoTypeSerializer =3D restoredMetaInfoSnap= shot.restoreTypeSerializer(serializerKey); =20 =09=09=09if (metaInfoTypeSerializer !=3D byteOrderedElementSerializer) { -=09=09=09=09CompatibilityResult compatibilityResult =3D CompatibilityUt= il.resolveCompatibilityResult( -=09=09=09=09=09metaInfoTypeSerializer, -=09=09=09=09=09null, -=09=09=09=09=09restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(se= rializerKey), -=09=09=09=09=09byteOrderedElementSerializer); +=09=09=09=09@SuppressWarnings("unchecked") +=09=09=09=09TypeSerializerSnapshot serializerSnapshot =3D Preconditions= .checkNotNull( +=09=09=09=09=09(TypeSerializerSnapshot) restoredMetaInfoSnapshot.getTyp= eSerializerConfigSnapshot(serializerKey)); + +=09=09=09=09TypeSerializerSchemaCompatibility compatibilityResult = =3D +=09=09=09=09=09serializerSnapshot.resolveSchemaCompatibility(byteOrderedEl= ementSerializer); =20 -=09=09=09=09if (compatibilityResult.isRequiresMigration()) { +=09=09=09=09if (compatibilityResult.isCompatibleAfterMigration()) { =09=09=09=09=09throw new FlinkRuntimeException(StateMigrationException.not= Supported()); =09=09=09=09} =20 diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/= org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-stat= e-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contri= b/streaming/state/RocksDBListState.java index f70c6a57bad..2c8f0d1a1e8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apa= che/flink/contrib/streaming/state/RocksDBListState.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -31,6 +32,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StateMigrationException; =20 import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -242,6 +244,36 @@ public void addAll(List values) { =09=09} =09} =20 +=09@Override +=09public void migrateSerializedValue( +=09=09=09DataInputDeserializer serializedOldValueInput, +=09=09=09DataOutputSerializer serializedMigratedValueOutput, +=09=09=09TypeSerializer> priorSerializer, +=09=09=09TypeSerializer> newSerializer) throws StateMigrationExcep= tion { + +=09=09Preconditions.checkArgument(priorSerializer instanceof ListSerialize= r); +=09=09Preconditions.checkArgument(newSerializer instanceof ListSerializer)= ; + +=09=09TypeSerializer priorElementSerializer =3D +=09=09=09((ListSerializer) priorSerializer).getElementSerializer(); + +=09=09TypeSerializer newElementSerializer =3D +=09=09=09((ListSerializer) newSerializer).getElementSerializer(); + +=09=09try { +=09=09=09while (serializedOldValueInput.available() > 0) { +=09=09=09=09V element =3D priorElementSerializer.deserialize(serializedOld= ValueInput); +=09=09=09=09newElementSerializer.serialize(element, serializedMigratedValu= eOutput); +=09=09=09=09if (serializedOldValueInput.available() > 0) { +=09=09=09=09=09serializedOldValueInput.readByte(); +=09=09=09=09=09serializedMigratedValueOutput.write(DELIMITER); +=09=09=09=09} +=09=09=09} +=09=09} catch (Exception e) { +=09=09=09throw new StateMigrationException("Error while trying to migrate = RocksDB list state.", e); +=09=09} +=09} + =09private static byte[] getPreMergedValue( =09=09List values, =09=09TypeSerializer elementSerializer, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/= org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.j= ava b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apa= che/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java new file mode 100644 index 00000000000..ee9698076cb --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apa= che/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.runtime.state.StateBackendMigrationTestBase; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.apache.commons.io.IOUtils; +import org.junit.After; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.rocksdb.RocksObject; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.ti= mes; + +/** + * Tests for the partitioned state part of {@link RocksDBStateBackend}. + */ +@RunWith(Parameterized.class) +public class RocksDBStateBackendMigrationTest extends StateBackendMigratio= nTestBase { + +=09private RocksDBKeyedStateBackend keyedStateBackend; +=09private List allCreatedCloseables; + +=09@Parameterized.Parameters(name =3D "Incremental checkpointing: {0}") +=09public static Collection parameters() { +=09=09return Arrays.asList(false, true); +=09} + +=09@Parameterized.Parameter +=09public boolean enableIncrementalCheckpointing; + +=09@Rule +=09public final TemporaryFolder tempFolder =3D new TemporaryFolder(); + +=09// Store it because we need it for the cleanup test. +=09private String dbPath; + +=09@Override +=09protected RocksDBStateBackend getStateBackend() throws IOException { +=09=09dbPath =3D tempFolder.newFolder().getAbsolutePath(); +=09=09String checkpointPath =3D tempFolder.newFolder().toURI().toString(); +=09=09RocksDBStateBackend backend =3D new RocksDBStateBackend(new FsStateB= ackend(checkpointPath), enableIncrementalCheckpointing); +=09=09backend.setDbStoragePath(dbPath); +=09=09return backend; +=09} + +=09@Override +=09protected BackendSerializationTimeliness getStateBackendSerializationTi= meliness() { +=09=09return BackendSerializationTimeliness.ON_ACCESS; +=09} + +=09// small safety net for instance cleanups, so that no native objects ar= e left +=09@After +=09public void cleanupRocksDB() { +=09=09if (keyedStateBackend !=3D null) { +=09=09=09IOUtils.closeQuietly(keyedStateBackend); +=09=09=09keyedStateBackend.dispose(); +=09=09} +=09=09if (allCreatedCloseables !=3D null) { +=09=09=09for (RocksObject rocksCloseable : allCreatedCloseables) { +=09=09=09=09verify(rocksCloseable, times(1)).close(); +=09=09=09} +=09=09=09allCreatedCloseables =3D null; +=09=09} +=09} +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/= org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/fli= nk-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink= /contrib/streaming/state/RocksDBStateBackendTest.java index 4916251fc1f..6a4faa036f6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apa= che/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apa= che/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -134,6 +134,11 @@ protected RocksDBStateBackend getStateBackend() throws= IOException { =09=09return backend; =09} =20 +=09@Override +=09protected BackendSerializationTimeliness getStateBackendSerializationTi= meliness() throws Exception { +=09=09return BackendSerializationTimeliness.ON_ACCESS; +=09} + =09@Override =09protected boolean isSerializerPresenceRequiredOnRestore() { =09=09return false; =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > Implement state conversion procedure in state backends > ------------------------------------------------------ > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Aljoscha Krettek > Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single so= urce of truth for recreating restore serializers, the next step would be to= utilize this when performing a full-pass state conversion (i.e., read with= old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion inh= erently happens, since all state is always deserialized after restore with = the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state = conversion=C2=A0needs to happen for per-registered state on their first acc= ess if the registered new serializer has a different serialization schema t= han the previous serializer. > This task should consist of=C2=A0three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether= the new serializer's schema is a) compatible with the serializer as it is,= b) compatible after the serializer has been reconfigured, or c) incompatib= le. > 2. Introduce state conversion procedures in the RocksDB state backend. Th= is should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer co= mpatibility checks. That is not required because those backends=C2=A0always= perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)