flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5051) Backwards compatibility for serializers in backend state
Date Tue, 13 Dec 2016 11:01:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744875#comment-15744875
] 

ASF GitHub Bot commented on FLINK-5051:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2962#discussion_r92144965
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
    +import org.apache.flink.core.io.IOReadableWritable;
    +import org.apache.flink.core.io.VersionedIOReadableWritable;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Serialization proxy for all meta data in keyed state backends. In the future we might
also migrate the actual state
    + * serialization logic here.
    + */
    +public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
    +
    +	private static final int VERSION = 1;
    +
    +	private TypeSerializerSerializationProxy<?> keySerializerProxy;
    +	private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
    +
    +	private ClassLoader userCodeClassLoader;
    +
    +	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
    +		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
    +	}
    +
    +	public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?,
?>> namedStateSerializationProxies) {
    +		this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
    +		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
    +		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
    +	}
    +
    +	public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
    +		return namedStateSerializationProxies;
    +	}
    +
    +	public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
    +		return keySerializerProxy;
    +	}
    +
    +	@Override
    +	public int getVersion() {
    +		return VERSION;
    +	}
    +
    +	@Override
    +	public void write(DataOutputView out) throws IOException {
    +		super.write(out);
    +
    +		keySerializerProxy.write(out);
    +
    +		out.writeShort(namedStateSerializationProxies.size());
    +		Map<String, Integer> kVStateToId = new HashMap<>(namedStateSerializationProxies.size());
    --- End diff --
    
    What is this Map used for?


> Backwards compatibility for serializers in backend state
> --------------------------------------------------------
>
>                 Key: FLINK-5051
>                 URL: https://issues.apache.org/jira/browse/FLINK-5051
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> When a new state is register, e.g. in a keyed backend via `getPartitionedState`, the
caller has to provide all type serializers required for the persistence of state components.
Explicitly passing the serializers on state creation already allows for potentiall version
upgrades of serializers.
> However, those serializers are currently not part of any snapshot and are only provided
at runtime, when the state is registered newly or restored. For backwards compatibility, this
has strong implications: checkpoints are not self contained in that state is currently a blackbox
without knowledge about it's corresponding serializers. Most cases where we would need to
restructure the state are basically lost. We could only convert them lazily at runtime and
only once the user is registering the concrete state, which might happen at unpredictable
points.
> I suggest to adapt our solution as follows:
> - As now, all states are registered with their set of serializers.
> - Unlike now, all serializers are written to the snapshot. This makes savepoints self-contained
and also allows to create inspection tools for savepoints at some point in the future.
> - Introduce an interface {{Versioned}} with {{long getVersion()}} and {{boolean isCompatible(Versioned
v)}} which is then implemented by serializers. Compatible serializers must ensure that they
can deserialize older versions, and can then serialize them in their new format. This is how
we upgrade.
> We need to find the right tradeoff in how many places we need to store the serializers.
I suggest to write them once per parallel operator instance for each state, i.e. we have a
map with state_name -> tuple3<serializer<KEY>, serializer<NAMESPACE>, serializer<STATE>>.
This could go before all key-groups are written, right at the head of the file. Then, for
each file we see on restore, we can first read the serializer map from the head of the stream,
then go through the key groups by offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message