flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT
Date Wed, 21 Sep 2016 19:21:34 GMT
Hi Josh!

The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
now, in order to add the elasticity feature (change parallelism or running
jobs and still maintaining exactly once guarantees).
At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
try and add compatibility towards 1.1 savepoints before the release of
version 1.2.

I think the exception is probably caused by the fact that old savepoint
stored some serialized user code (the new one is not expected to) which
cannot be loaded.

Adding Aljoscha and Stefan to this, to see if they can add anything.
In any case, this should have a much better error message.

I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
so not really recommended for general use.

Does version 1.1 not work for you?


On Wed, Sep 21, 2016 at 7:44 PM, Josh <jofo90@gmail.com> wrote:

> Hi,
> I have a Flink job which uses the RocksDBStateBackend, which has been
> running on a Flink 1.0 cluster.
> The job is written in Scala, and I previously made some changes to the job
> to ensure that state could be restored. For example, whenever I call `map`
> or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
> MyCustomFlatMapper())` instead of an anonymous function.
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
> to restore state. I'm seeing exceptions which look like this when trying to
> restore from a savepoint:
> java.lang.RuntimeException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(
> AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: com.joshfg.flink.job.MyJob$
> MyCustomFlatMapper$$anon$4$$anon$2
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:653)
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
> DataStreams, so it looks like this exception is caused just from using
> Scala functions like `filter`, `map`, `flatMap` on standard Scala
> collections, within my class `MyCustomFlatMapper`.
> Are there any changes to the way Flink state is restored or to
> RocksDBStateBackend, in the last 2-3 months, which could cause this to
> happen?
> If so, any advice on fixing it?
> I'm hoping there's a better solution to this than rewriting the Flink job
> in Java.
> Thanks,
> Josh

View raw message