flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades
Date Sun, 07 May 2017 10:32:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999777#comment-15999777

ASF GitHub Bot commented on FLINK-6425:

Github user StefanRRichter commented on a diff in the pull request:

    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/MigrationStrategy.java
    @@ -0,0 +1,87 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.common.typeutils;
    +import org.apache.flink.annotation.PublicEvolving;
    + * A {@code MigrationStrategy} contains information about how to perform migration of
data written
    + * by an older serializer so that new serializers can continue to work on them.
    + *
    + * @param <T> the type of the data being migrated.
    + */
    +public final class MigrationStrategy<T> {
    +	/** Whether or not migration is required. */
    +	private final boolean requiresStateMigration;
    +	/**
    +	 * The fallback deserializer to use, in the case the preceding serializer cannot be
    +	 *
    +	 * <p>This is only relevant if migration is required.
    +	 */
    +	private final TypeSerializer<T> fallbackDeserializer;
    +	/**
    +	 * Returns a strategy that simply signals that no migration needs to be performed.
    +	 *
    +	 * @return a strategy that does not perform migration
    +	 */
    +	public static <T> MigrationStrategy<T> noMigration() {
    +		return new MigrationStrategy<>(false, null);
    +	}
    +	/**
    +	 * Returns a strategy that signals migration to be performed, and in the case that the
    +	 * preceding serializer cannot be found, a provided fallback deserializer can be
    +	 * used.
    +	 *
    +	 * @param fallbackDeserializer a fallback deserializer that can be used to read old
data for the migration
    +	 *                             in the case that the preceding serializer cannot be found.
    +	 *
    +	 * @return a strategy that performs migration with a fallback deserializer to read old
    +	 */
    +	public static <T> MigrationStrategy<T> migrateWithFallbackDeserializer(TypeSerializer<T>
fallbackDeserializer) {
    +		return new MigrationStrategy<>(true, fallbackDeserializer);
    +	}
    +	/**
    +	 * Returns a strategy that signals migration to be performed, without a fallback deserializer.
    +	 * If the preceding serializer cannot be found, the migration fails because the old
data cannot be read.
    +	 *
    +	 * @return a strategy that performs migration, without a fallback deserializer.
    +	 */
    +	public static <T> MigrationStrategy<T> migrate() {
    --- End diff --
    Maybe this should be named `migrationRequired`?

> Integrate serializer reconfiguration into state restore flow to activate serializer upgrades
> --------------------------------------------------------------------------------------------
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as a mechanism
to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as the state's
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. Deserialization may fail
if a) the serializer no longer exists in classpath, or b) the serializer class is not longer
valid (i.e., implementation changed and resulted in different serialVersionUID). In this case,
use a dummy serializer as a placeholder. This dummy serializer is currently the {{ClassNotFoundProxySerializer}}
in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The configuration
snapshot must be successfully deserialized, otherwise the state restore fails.
> 3. When we get the new registered serializer for the state (could be a completely new
serializer, the same serializer with different implementations, or the exact same serializer
untouched; either way they are seen as a new serializer), we use the configuration snapshot
of the old serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the result of
the upgrade, state conversion needs to take place (for now, if state conversion is required,
we just fail the job as this functionality isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but requires state
conversion, without the requirement that the old serializer needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires the old serializer
to be present (i.e., can not be the dummy {{ClassNotFoundProxySerializer}}).

This message was sent by Atlassian JIRA

View raw message