flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Clarification of TypesSerializers
Date Mon, 17 Jul 2017 05:13:35 GMT
Hi Dawid,

It would be possible to make this work by reconfiguring the serializer to be able to read
both “old” and “new” formats, and write in only the new format, and just return `CompatibilityResult.compatible()`.
Note that this typically isn’t recommended, since if you’re using RocksDB backend which
lazily deserializes and eagerly serializes on each state access, you’ll have mixed formats
in the state (i.e.,keys that are not yet accessed will be in the old format).
This would entail a lot of maintenance code just for compatibility in the future.

Also note that the above approach isn’t respecting the definition of serializer compatibility.
Two serializers are compatible iff their written binary formats are identical. That’s why
in your case it is only reasonable to use `CompatibilityResult.requiresMigration`.
I see in your code snippet that is what you are doing right now, but then again, keep in mind
that state migration isn’t available yet.

Does that answer your question?


On 10 July 2017 at 3:38:16 PM, Dawid Wysakowicz (wysakowicz.dawid@gmail.com) wrote:

Hi Gordon,  

Thanks for your response. Generally speaking, I wanted to do what you wrote in the second
mail, when I first had a look at the TypeSerializer interface. So I think it really makes
sense :)  

As you said the requiresMigration does not work right now, I tried looking for some workaround
and found out that the ensureCompatibility is called before each state deserialization. So
I tried using a flag set in that method, but as you described it does not always work for
HeapStateBackend as the state is loaded eagerly with old serializer. I managed to create two
serializers, one “old" that deserialises only old format, and “new" one that can deserialise
both based on a flag set in ensureCompatibility. Do you think such strategy is valid for now?
(as the state migration is not yet there)  

You can check my code here in lines(702-1087):  


> On 7 Jul 2017, at 15:33, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:  
> Some extra information after understanding your case a bit more:  
> What you would need to do, is to return a `CompatibilityResult#requiresMigration(convertSerializer)`,
where the `convertSerializer` is a serializer that does read the extra information.  
> The `convertSerializer` will only ever be used to read the old data, and the new serializer
is used to serialize state so that it is written in the new format.  
> Unfortunately, currently if you return `requiresMigration()`, the job will always fail
since this is currently not supported yet. State migration is currently blocked by [1].  
> I’m happy to hear feedback regarding the new compatibility / config snapshot methods
on TypeSerializer.  
> Let me know if all of this makes sense to you :)  
> Cheers,  
> Gordon  
> [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-22-Eager-State-Declaration-td18562.html
> On 7 July 2017 at 9:23:32 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:  
> Hi Dawid,  
> First of all, one thing to clarify: TypeSerializer#ensureCompatibility is invoked on
the new provided state serializer.  
> Also, a reconfigured compatible serializer should NOT have different serialization formats
(that would require state migration, i.e. return CompatibilityResult#requiresStateMigration).
> A reconfigured serializer will be continued to be used by Flink as if nothing has changed,
can read old AND new data, and still writes in the same format.  
> In your case, I think you may be interpreting “a reconfigured serializer” incorrectly.
> Unfortunately it does not work for HeapStateBackend as during restoring the StateBackend
the method TypeSerializer#ensureCompatibility is not invoked and the state value is eagerly
deserialized with the not reconfigured serializer.  
> For the HeapStateBackend, as of now, this is expected. The main reason for this is that
currently, new state serializers are provided lazily (i.e. when the state descriptor is registered).
There is no new serializer available to be confronted / reconfigured with the previous TypeSerializerConfigSnapshot
at restore time.  
> Therefore, for HeapStateBackend, we are using the old serialized serializer (not invoked
with #ensureCompatibility) to read everything to state objects. This should always work as
long as the old serializer can be deserialized properly.  
> Cheers,  
> Gordon  
> On 7 July 2017 at 5:57:56 PM, Dawid Wysakowicz (wysakowicz.dawid@gmail.com) wrote:  
> Hi devs,  
> Currently I am working on some changes to serializer for NFA class in CEP library. I
am trying to understand how the TypeSerializer#ensureCompatibility feature works.  
> What I want to do is in a previous version (e.g. in 1.3.0) some information was serialized
that now shouldn't. In TypeSerializer#ensureCompatibility I am setting a flag based on corresponding
ConfigSnapshot version that tells me if that additional info should be read.  
> So let’s get to the point :). Unfortunately it does not work for HeapStateBackend as
during restoring the StateBackend the method TypeSerializer#ensureCompatibility is not invoked
and the state value is eagerly deserialized with the not reconfigured serializer.  
> It does work though for RocksDBStateBackend, as while restoring there is no deserialisation
of the value(lazy deserialization). It is first deserialized when accessing (getColumnFamily
etc. I suppose) and then the method ensuringCompatibility is called and the serializer is
properly reconfigured.  
> My questions are:  
> - is my serialization plan ok, with setting the flag  
> - are the different behaviours intended or is it a bug for HeapStateBackend  
> If it is a bug, I would be willing to fix it(or at least try), but probably I will need
some guidance.  
> Regards  
> Dawid  

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message