flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
Date Mon, 29 May 2017 09:58:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028181#comment-16028181
] 

ASF GitHub Bot commented on FLINK-6478:
---------------------------------------

Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4006#discussion_r118900486
  
    --- Diff: docs/dev/stream/state.md ---
    @@ -429,3 +429,120 @@ public static class CounterSource
     
     Some operators might need the information when a checkpoint is fully acknowledged by
Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener`
interface.
     
    +## Custom serialization for managed state
    +
    +This section is targeted as a guideline for users who require using custom serialization
for their state, covering how
    +to provide a custom serializer and how to handle upgrades to the serializer for compatibility.
If you're simply using
    +Flink's own serializers, this section is irrelevant and can be skipped. 
    +
    +### Using custom serializers
    +
    +As demonstrated in the above examples, when registering a managed operator or keyed state,
a `StateDescriptor` is required
    +to specify the state's name, as well as information about the type of the state. The
type information is used by Flink's
    +[type serialization framework](../types_serialization.html) to create appropriate serializers
for the state.
    +
    +It is also possible to completely bypass this and let Flink use your own custom serializer
to serialize managed states,
    +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer`
implementation:
    +
    +{% highlight java %}
    +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    +    new ListStateDescriptor<>(
    +        "state-name",
    +        new TypeSerializer<> {...});
    +
    +checkpointedState = getRuntimeContext().getListState(descriptor);
    +{% endhighlight %}
    +
    +### Handling serializer upgrades and compatibility
    +
    +Flink allows changing the serializers used to read and write managed state, so that users
are not locked in to any
    +specific serialization. When state is restored, the new serializer registered for the
state (i.e., the serializer
    +that comes with the `StateDescriptor` used to access the state in the restored job) will
be checked for compatibility,
    +and is replaced as the new serializer for the state.
    +
    +A compatible serializer would mean that the serializer is capable of reading previous
serialized bytes of the state,
    +and the written binary format of the state also remains identical. The means to check
the new serializer's compatibility
    +is provided through the following two methods of the `TypeSerializer` interface:
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot);
    +{% endhighlight %}
    +
    +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method
is called to create a
    +point-in-time view of the state serializer's configuration. The returned configuration
snapshot is stored along with the
    +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that
serializer configuration snapshot
    +will be used to confront the _new_ serializer of the same state via the counterpart method,
`ensureCompatibility`. This
    +method serves as a check for whether or not the new serializer is compatible, as well
as a hook to possibly reconfigure
    +the new serializer in the case that it is incompatible.
    +
    +Note that Flink's own serializers are implemented such that they are at least compatible
with themselves, i.e. when the
    +same serializer is used for the state in the restored job, the serializer's will reconfigure
themselves to be compatible
    +with their previous configuration.
    +
    +The following subsections illustrate guidelines to implement these two methods when using
custom serializers.
    +
    +#### Implementing the `snapshotConfiguration` method
    +
    +The serializer's configuration snapshot should capture enough information such that on
restore, the information
    +carried over to the new serializer for the state is sufficient for it to determine whether
or not it is compatible.
    +This could typically contain information about the serializer's parameters or binary
format of the serialized data;
    +generally, anything that allows the new serializer to decide whether or not it can be
used to read previous serialized
    +bytes, and that it writes in the same binary format.
    +
    +How the serializer's configuration snapshot is written to and read from checkpoints is
fully customizable. The below
    +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`.
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
    +  public abstract int getVersion();
    +  public void read(DataInputView in) {...}
    +  public void write(DataOutputView out) {...}
    +}
    +{% endhighlight %}
    +
    +The `read` and `write` methods define how the configuration is read from and written
to the checkpoint. The base
    +implementations contain logic to read and write the version of the configuration snapshot,
so it should be extended and
    +not completely overridden.
    +
    +The version of the configuration snapshot is determined through the `getVersion` method.
Versioning for the serializer
    +configuration snapshot is the means to maintain compatible configurations, as information
included in the configuration
    +may change over time. When reading from the checkpoint, you can use the `getReadVersion`
method to determine the version
    +of the written configuration and adapt the read logic to the specific version.
    +
    +<span class="label label-danger">Attention</span> Do not mistaken the version
of the serializer's configuration snapshot
    +to be related with upgrading the serializer. The exact same serializer can have different
implementations of its
    +configuration snapshot, for example when more information is added to the configuration
to allow more comprehensive
    +compatibility checks in the future.
    +
    +One limitation of implementing a `TyoeSerializerConfigSnapshot` is that an empty constructor
must be present. The empty
    --- End diff --
    
    TyoeSerializerConfigSnapshot => TypeSerializerConfigSnapshot


> Add documentation on how to upgrade serializers for managed state
> -----------------------------------------------------------------
>
>                 Key: FLINK-6478
>                 URL: https://issues.apache.org/jira/browse/FLINK-6478
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> There needs to be a documentation that explains how to use the new serializer upgrade
APIs in {{TypeSerializer}}, and how the methods work with checkpoints. This documentation
should probably be placed under "Application development --> Streaming --> Working with
State".
> Ideally, it should also come with a minimal example for users that perhaps use serialization
frameworks that already have built-in backwards compatibility (such as Thrift).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message