flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Carey <sca...@expedia.com>
Subject Re: Firing windows multiple times
Date Wed, 10 Aug 2016 20:15:00 GMT
Hi Aljoscha,

Yes, I am using an Evictor, and I think I have seen the problem you are referring to. However,
that's not what I'm talking about.

If you re-read my first email, the main point is the following: if users desire updates more
frequently than window watermarks are reached, then window state behaves suboptimally. It
doesn't matter if there's an evictor or not. Specifically:

If I have a windows "A" that I fire multiple times in order to provide incremental results
as data comes in instead of waiting for the watermark to purge the window
And that window's events are gathered into another, bigger window "B"
And I want to keep only the latest event from each upstream window "A" (by timestamp, where
each window pane has its own timestamp)
Even if I have a fold/reduce method on the bigger window "B" to make sure that each updated
event from "A" overwrites the previous event (by timestamp)
Window "B" will hold in state all events from windows "A", including all the incremental events
that were fired by processing-time triggers, even though I don't actually need those events
because the reducer gets rid of them

An example description of execution flow:

  1.  Event x
  2.  Window A receives event, trigger waits for processing time delay, then emits event x(time=1,
count=1)
  3.  Window B receives event, trigger waits for processing time delay, then executes fold()
and emits event(time=1 => count=1), but internal Window state looks like [x(time=1, count=1)]
  4.  Event y
  5.  Window A receives event, trigger '', then emits event y(time=1, count=2)
  6.  Window B receives event, trigger '', then executes fold() and emits event(time=1 =>
count=2), but internal Window state looks like [x(time=1, count=1), y(time=1, count=2)]
  7.  Watermark z
  8.  Window A receives watermark, trigger's event timer is reached, fires and purges and
emits current state as event z(time=1, count=2)
  9.  Window B receives event, trigger waits for processing time delay, then executes fold()
and emits event(time=1 => count=2), but internal Window state looks like [x(time=1, count=1),
y(time=1, count=2), z(time=1, count=2)]

As you can see, the internal window state continues to grow despite what fold() is doing.

Does that explanation help interpret my original email?

-Shannon


From: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>
Date: Wednesday, August 10, 2016 at 12:18 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
from your mail I'm gathering that you are in fact using an Evictor, is that correct? If not,
then the window operator should not keep all the elements ever received for a window but only
the aggregated result.

Side note, there seems to be a bug in EvictingWindowOperator that causes evicted elements
to not actually be removed from the state. They are only filtered from the Iterable that is
given to the WindowFunction. I opened a Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369

Cheers,
Aljoscha

On Wed, 10 Aug 2016 at 18:19 Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
wrote:
One unfortunate aspect of using a fold() instead of a window is that the fold function has
no knowledge of the watermarks. As a result, it is difficult to ensure that only items before
the current watermark are included in the aggregation, and that old items are evicted correctly.
This fact lends more support to the idea of using a custom operator (though that is more complex)
or adding support for this use case within Flink.

-Shannon
Mime
View raw message