flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades
Date Sun, 07 May 2017 10:39:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999782#comment-15999782
] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139530
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
---
    @@ -161,7 +162,93 @@
     
     	public abstract int hashCode();
     
    -	public boolean canRestoreFrom(TypeSerializer<?> other) {
    -		return equals(other);
    +	// --------------------------------------------------------------------------------------------
    +	// Serializer configuration snapshotting & reconfiguring
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Create a snapshot of the serializer's current configuration to be stored along with
the managed state it is
    +	 * registered to (if any - this method is only relevant if this serializer is registered
for serialization of
    +	 * managed state).
    +	 *
    +	 * <p>The configuration snapshot should contain information about the serializer's
parameter settings and its
    +	 * serialization format. When a new serializer is registered to serialize the same managed
state that this
    +	 * serializer was registered to, the returned configuration snapshot can be used to
check with the new serializer
    +	 * if any data migration needs to take place.
    +	 *
    +	 * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}
    +	 * configuration if they guarantee forwards compatibility. For example, implementations
that use serialization
    +	 * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a>
or
    +	 * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>,
is suitable for this usage pattern. By
    +	 * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs
Flink that when managed
    +	 * state serialized using this serializer is restored, there is no need to check for
migration with the new
    +	 * serializer for the same state. In other words, new serializers are always assumed
to be fully compatible for the
    +	 * serialized state.
    +	 *
    +	 * @see TypeSerializerConfigSnapshot
    +	 * @see ForwardCompatibleSerializationFormatConfig
    +	 *
    +	 * @return snapshot of the serializer's current configuration.
    +	 */
    +	public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +
    +	/**
    +	 * Get the migration strategy to use this serializer based on the configuration snapshot
of a preceding
    +	 * serializer that was registered for serialization of the same managed state (if any
- this method is only
    +	 * relevant if this serializer is registered for serialization of managed state).
    +	 *
    +	 * <p>Implementations need to return the resolved migration strategy. The strategy
can be one of the following:
    +	 * <ul>
    +	 *     <li>{@link MigrationStrategy#noMigration()}: this signals Flink that this
serializer is compatible, or
    +	 *     has been reconfigured to be compatible, to continue reading old data, and that
the
    +	 *     serialization schema remains the same. No migration needs to be performed.</li>
    +	 *
    +	 *     <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}:
this signals Flink that
    +	 *     migration needs to be performed, because this serializer is not compatible, or
cannot be reconfigured to be
    +	 *     compatible, for old data. Furthermore, in the case that the preceding serializer
cannot be found or
    +	 *     restored to read the old data, the provided fallback deserializer can be used.</li>
    +	 *
    +	 *     <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration
needs to be performed, because
    +	 *     this serializer is not compatible, or cannot be reconfigured to be compatible,
for old data.</li>
    +	 * </ul>
    +	 *
    +	 * <p>This method is guaranteed to only be invoked if the preceding serializer's
configuration snapshot is not the
    +	 * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration.
In such cases, Flink always
    +	 * assume that the migration strategy is {@link MigrationStrategy#migrate()}.
    +	 *
    +	 * @see MigrationStrategy
    +	 *
    +	 * @param configSnapshot configuration snapshot of a preceding serializer for the same
managed state
    +	 *
    +	 * @return the result of the reconfiguration.
    +	 */
    +	protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot
configSnapshot);
    --- End diff --
    
    This method is currently doing (up to) 3 things: (maybe) reconfigure the serializer, give
advise about if we need to convert, and (maybe) provide a backwards compatibe serializer.
Maybe this can be told as a high level overview in the beginning of the doc, then all the
details.
    
    From this, I think also a name like `ensureCompatibility` returning a `CompatibilityResult`
could be a better fit? 


> Integrate serializer reconfiguration into state restore flow to activate serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as a mechanism
to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as the state's
metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. Deserialization may fail
if a) the serializer no longer exists in classpath, or b) the serializer class is not longer
valid (i.e., implementation changed and resulted in different serialVersionUID). In this case,
use a dummy serializer as a placeholder. This dummy serializer is currently the {{ClassNotFoundProxySerializer}}
in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The configuration
snapshot must be successfully deserialized, otherwise the state restore fails.
> 3. When we get the new registered serializer for the state (could be a completely new
serializer, the same serializer with different implementations, or the exact same serializer
untouched; either way they are seen as a new serializer), we use the configuration snapshot
of the old serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the result of
the upgrade, state conversion needs to take place (for now, if state conversion is required,
we just fail the job as this functionality isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but requires state
conversion, without the requirement that the old serializer needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires the old serializer
to be present (i.e., can not be the dummy {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message