flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jelmer <jkupe...@gmail.com>
Subject Re: How to make savepoints more robust in the face of refactorings ?
Date Tue, 30 Jan 2018 11:25:29 GMT
I looked into it a little more. The anonymous-classed serializer is being
created here

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L1247

So far the only strategy for making it less likely to break is defining the
Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] =
createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like
Tuple2TypeInformation$$anon$2$$annon$1 instead of
com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename
this Tuple2TypeInformation around everything will work.. but it feels very
suboptimal.



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <tzulitai@apache.org>
wrote:

> Hi,
>
> In the Scala API, type serializers may be anonymous classes generated by
> Scala macros, and would therefore contain a reference to the wrapping class
> (i.e., your `Operators` class).
> Since Flink currently serializes serializers into the savepoint to be used
> for deserialization on restore, and the fact that they must be present at
> restore time, changing the `Operators` classname would result in the
> previous anonymous class serializer to no longer be in the classpath and
> therefore fails the deserialization of the written serializer.
> This is a limitation caused by how registering serializers for written
> state currently works in Flink.
>
> Generally speaking, to overcome this, you would need to have the previous
> serializer class still around in the classpath when restoring, and can only
> be completely removed from user code once the migration is completed.
>
> One thing that I’m not completely certain with yet, is where in your
> demonstrated code a anonymous-classed serializer is generated for some type.
> From what I see, there shouldn’t be any anonymous-class serializers for
> the code. Is the code you provided a “simplified” version of the actual
> code in which you observed the restore error?
>
> Cheers,
> Gordon
>
>
> On 28 January 2018 at 6:00:32 PM, jelmer (jkuperus@gmail.com) wrote:
>
> Changing the class operators are nested in can break compatibility with
> existing savepoints. The following piece of code demonstrates this
>
> https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a
>
> If I change Operators in this file to Operators2  i will not be able to
> recover from a savepoint that was made  when this class still had its old
> name.
>
> The error in the flink ui will be
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMet
> aInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1216)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
>
> But the real reason is found in the task manager logs
>
>
> 2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.
> TypeSerializerSerializationUtil  - Deserialization of serializer errored;
> replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti
> l.java:463)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti
> l.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti
> l.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.
> readSerializersAndConfigsWithResilience(TypeSerializerSerializationUti
> l.java:282)
> at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnaps
> hotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(
> KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
> KeyedBackendSerializationProxy.java:152)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1175)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti
> l.java:454)
> ... 17 more
> Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$
> anon$3$$anon$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.
> resolveClass(InstantiationUtil.java:64)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> FailureTolerantObjectInputStream.readClassDescriptor(
> TypeSerializerSerializationUtil.java:110)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
> ... 22 more
>
>
>
> Is there any way to make this code more robust ? Using java serialization
> in this way feels very brittle in the face of refactorings.
>
>

Mime
View raw message