flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: State serialization problem when we add a new field in the object
Date Wed, 14 Mar 2018 00:27:55 GMT
Hi,

I'm afraid Flink does currently not support changing the schema of state when restoring from
a savepoint.

Best,
Aljoscha

> On 13. Mar 2018, at 07:36, kla <lalafaryan@gmail.com> wrote:
> 
> Hi guys,
> 
> I have the flink streaming job running (1.2.0 version) which has the
> following state:
> 
> private transient ValueState<Map&lt;String, Set&lt;User>>> userState;
> 
> With following configuration:
> 
> final ValueStateDescriptor<Map&lt;String, Set&lt;User>>> descriptor
=
>                new ValueStateDescriptor<>("userState",
> TypeInformation.of(new UserTypeHint()));
>        userState = getRuntimeContext().getState(descriptor);
> And the User class has following:
> 
> public class User {
> 
>    private String id;
> 
>    private String firstName;
> 
>    private String lastName;
> 
> }
> 
> And after some time we tried to add one more field in the user object. (for
> example emailAddress). But apparently I didn't work, I am getting following
> exception:
> 
> 018-03-13 13:26:13,357 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CountJob
> (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
> com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> Serialization trace:
> type (com.example.User)
> 	at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> 	at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> 	at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
> 	at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
> 	at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
> 	at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
> 	at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
> 	at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> 	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> 	at java.util.ArrayList.get(ArrayList.java:429)
> 	at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> 	at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> 	... 15 more
> 
> 
> Thanks,
> Konstantin
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Mime
View raw message