flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Interesting window behavior with savepoints
Date Wed, 18 May 2016 11:04:50 GMT
Hi Andrew,
the reason why the program doesn't fail (and cannot fail, with the current
architecture) is that the partitioned state is dynamic/lazy. For example,
the count trigger might have a partitioned state called "count" that it
uses to keep track of the count. The time trigger requires no state but
simply reacts to watermark/processing time progress. The system doesn't
know the names and types of state that user functions or internal operators
will access. And the types/names might even change over the runtime of a

If you restore with one window type the state for the other type will
simply sit around, not being queried.


On Mon, 16 May 2016 at 17:42 Andrew Whitaker <
andrew.whitaker@braintreepayments.com> wrote:

> Thanks Ufuk.
> Thanks for explaining. The reasons behind the savepoint being restored
> successfully kind of make sense, but it seems like the window type (count
> vs time) should be taken into account when restoring savepoints. I don't
> actually see anyone doing this, but I would expect flink to complain about
> changing windowing semantics between program versions.
> On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>> For a WindowedStream the uid would be set on the result of the
>> apply/reduce/fold call. The WindowedStream itself does not represent an
>> operation.
>> On Fri, 13 May 2016 at 00:20 Ufuk Celebi <uce@apache.org> wrote:
>>> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
>>> <andrew.whitaker@braintreepayments.com> wrote:
>>> > From what I've observed, most of the time when Flink can't successfully
>>> > restore a checkpoint it throws an exception saying as much. I was
>>> expecting
>>> > to see that behavior here. Could someone explain why this "works" (as
>>> in,
>>> > flink accepts the program with the savepoint from the first version of
>>> the
>>> > program), and if this is a bug?
>>> Hey Andrew! Thanks for reporting this.
>>> Flink generates operator IDs and uses these to map the state back to
>>> the same operator when restoring from a savepoint. We want these IDs
>>> to stay the same as long as the program does not change.
>>> The ID can either be generated automatically by Flink or manually by the
>>> user.
>>> The automatically generated ID is based on certain topology attributes
>>> like parallelism, operator placement, etc. If the attribute changes,
>>> the operator ID changes and you can't map the savepoint state back. If
>>> it stays the same, we assume that the program has not changed.
>>> The problem in your example is that to Flink both programs look the
>>> same with respect to how the IDs are generated: the topology didn't
>>> change and both the time and count window are executed by the
>>> WindowOperator with an InternalWindowFunction.
>>> The recommended way to work with savepoints is to skip the automatic
>>> IDs altogether and assign the IDs manually instead. You can do this
>>> via the "uid(String)" method of each operator, which gives you
>>> fine-grained control over the "versioning" of state:
>>> env.addSource(..).uid("my-source")
>>> vs.
>>> env.addSource(..).uid("my-source-2")
>>> The problem I've just noticed is that you can't specify this on
>>> WindowedStreams, but only on DataStreams, which is clearly a bug.
>>> Furthermore, it might be a good idea to special case windows when
>>> automatically generating the IDs.
>>> I hope this helps a little with understanding the core problem. If you
>>> have further questions, feel free to ask. I will make sure to fix this
>>> soon.
>>> – Ufuk
> --
> Andrew Whitaker | andrew.whitaker@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.

View raw message