flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject RE: KryoException: Encountered unregistered class ID
Date Tue, 06 Jun 2017 16:20:00 GMT
Hi Shai,

Your suggestion makes a lot of sense. I did not realize Kryo allows changing that, thanks
for correcting!
It’s definitely reasonable to provide a way to proxy that setting through the `StreamExecutionEnvironment`,
if Kryo itself has the functionality already.
I’ve filed a JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-6857.


On 6 June 2017 at 6:07:15 PM, Shai Kaplan (shai.kaplan@microsoft.com) wrote:

Actually Kryo does allow overriding that by calling kryo.setDefaultSerializer(), it would
be nice if Flink would provide a way to override getKryoInstance() or to subscribe a callback
for after Kryo is initialized, or simply let me define a default serializer and then call
kryo.setDefaultSerializer(), the same way it "forwards" the addDefaultSerializer calls.

I could register the TaggedFieldSerializer for my specific classes, but this is error prone,
because at some point I might add a new class to the state and forget to register it, and
then I won't be able to change it.



From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]  
Sent: Tuesday, June 06, 2017 6:52 PM
To: user@flink.apache.org
Subject: RE: KryoException: Encountered unregistered class ID


Ah, I see what you’re trying to achieve. Then I don’t think that is possible then, simply
because Kryo doesn’t allow overriding that.


But wouldn’t you be able to just, through Flink’s `StreamExecutionEnvironment`, register
the TaggedFieldSerializer for your to-be-migrated specific class?

The downside of course is that you always need that registered as the default serializer for
the class, but I think that’s the only possible way at the moment.


On 6 June 2017 at 5:45:12 PM, Shai Kaplan (shai.kaplan@microsoft.com) wrote:

No. This way you can set default serializers for specific classes, I want to change the default
serializer that Kryo uses when it can't find a registered serializer for a class.

See Kryo.getDefaultSerializer(), notice the last line calls newDefaultSerializer() which is
hardcoded to be FieldSerializer.


From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]  
Sent: Tuesday, June 06, 2017 6:13 PM
To: user@flink.apache.org
Subject: RE: KryoException: Encountered unregistered class ID


StreamExecutionEnvironment.addDefaultKryoSerializer(YourClass.class, TaggedFieldSerializer.class)
should work.

You can also specify it directly: StreamExecutionEnvironment.registerTypeWithKryoSerializer(YourClass.class,


Does the above work?

On 6 June 2017 at 5:09:21 PM, Shai Kaplan (shai.kaplan@microsoft.com) wrote:

I understand that my problem arises from Kryo using FieldSerializer as the default serializer.

Looking at Kryo's documentation (https://github.com/EsotericSoftware/kryo#compatibility),
this could be easily solved by setting the default serializer to TaggedFieldSerializer. Flink,
however, doesn't let me access Kryo directly (specifically after initialization), I can only
add extra serializers using StreamExecutionEnvironment. addDefaultKryoSerializer, but the
default will still be FieldSerializer. Is there any way I can interfere with checkKryoInitialized()
and set the default serializer?


From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]  
Sent: Sunday, June 04, 2017 3:28 PM
To: Shai Kaplan <Shai.Kaplan@microsoft.com>; user@flink.apache.org
Subject: Re: KryoException: Encountered unregistered class ID


Hi Shai,


Flink’s Kryo registrations do not allow specifying the registration ID. They simply start
from ID 10 ( < 10 is reserved by Kryo for primitive types).


My guess at what you’re observing here is that when trying to deserialize your newly changed
class instance, it also tries to read the extra field, which did not exist before.

This extra read caused the interpreted ID of the next to-be-read instance to be messed up.
Therefore, it isn’t that the ID of that class has changed, but simply that the deserialization
is incorrectly reading extra bytes for the previous instance and the ID of the next instance
is read at the wrong position.


The main issue here is that Kryo itself doesn’t handle serializer upgrades, i.e. the new
serializer for your class created by Kryo will try to read that extra field even though it
previously did not exist.

I would suggest two possible solutions here:

1. Simply let the original class stay untouched, and have a new class for your updated schema.
When reading the old state, Kryo will use the correct serializer to read instances of the
old class.

2. Directly change the old class, but you have to register a custom serializer for that class,
which can avoid the new fields if necessary when reading (i.e., skip reading that field if
it simply isn’t there).  





On 4 June 2017 at 1:57:01 PM, Shai Kaplan (shai.kaplan@microsoft.com) wrote:


I'm running a job from a savepoint. I've changed one of the classes stored in state. When
I try to load the value from the state I get "com.esotericsoftware.kryo.KryoException: Encountered
unregistered class ID: 97".

I tried to understand if the problem arise from the nature of the change, or simply because
there was a change in the class, so I changed the class so that the only difference from the
previous version is a new boolean field, and the problem still occur (the ID number changes
when I change the class). If I revert the class back to its old version, everything is fine.


I'm not sure if the class ID that I'm seeing is the right one, or is it just some random number
received from reading the wrong place in the serialized object, or something like that. When
I change the boolean to String the number in the exception changes to 41188…


What should I do to be able to restore from the state a class that now has a new field? Should
I manually register the class? With what ID?
View raw message