flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mrooding <ad...@webresource.nl>
Subject Savepoints and migrating value state data types
Date Wed, 20 Sep 2017 12:16:27 GMT

We've got a situation where we're merging several Kafka streams and for
certain streams, we want to retain up to 6 days of history. We're trying to
figure out how we can migrate savepoint data between application updates
when the data type for a certain state buffer updates.

Let's assume that we have 2 streams with the following data types:

case class A(id: String, name: String)
case class B1(id: String, price: Double)

We have a CoProcessFunction which combines the 2 streams and maintains 2
different buffer states:

MapState[String, A] and ValueState[B1]

In our scenario, we're trying to anticipate the data type of B1 changing in
the future. Let's assume that in the foreseeable future, B1 will change to:

case class B2(id: String, price: Double, date: String)

When we create a snapshot using B1 and then upgrading the application to B2
the obvious attempt would be to try and retrieve the stored ValueState and
the new ValueState:

val oldState = getRuntimeContext.getState(new
ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
val newState = getRuntimeContext.getState(new
ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))

However, as soon as you do the following error occurs:

Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
previous serializer of the keyed state must be present; the serializer could
have been removed from the classpath, or its implementation have changed and
could not be loaded. This is a temporary restriction that will be fixed in
future versions.

Our assumption is that the process operator which has a specified ID which
Flink uses to save and restore savepoints. The CoProcessorFunction types
changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
therefore the savepoint data does not apply to the operator anymore. Is this
assumption correct?

We've been going through the documentation and source code of Flink and it
seems like there's no achieve this kind of migrations. If this is the case,
we'd be interested in contributing to Flink to get this added a.s.a.p. and
would love to get some feedback on how to approach this.

Thanks in advance


Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message