flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Firing windows multiple times
Date Thu, 11 Aug 2016 12:26:47 GMT
Just to add a drawback in solution 2) you may have some issues because window boundaries may
be aligned. For example the elements of a day window may be split between the last day of
a month 
and the first of the next month.


> On Aug 11, 2016, at 2:21 PM, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
> Hi Shanon,
> From what I understand, you want to have your results windowed by different different
durations, e.g. by minute, by day,
> by month and you use the evictor to  decide which elements should go into each window.
If I am correct, then I do not 
> think that you need the evictor which bounds you to keep all the elements that the operator
has seen (because it uses a listState).
> In this case you can do one of the following:
> 1) if you just want to have the big window (by month) and all the smaller ones to appear
as early firings of the big one, then I would 
> suggest you to go with a custom trigger. The trigger has access to watermarks, can register
both event and processing time timers (so you can have firings whenever you want (per minute,
per day, etc), can have state (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE.
> The only downside is that all intermediate firings will appear to belong to the big window.
This means that the beginning and the end o the by-minute and daily firings will be those
of the month that they belong to. If this is not a problem, I would go for that.
> 2) If the above is a problem, then what you can do, is key your input stream and then
have 3 different windowing strategies, e.g. by minute, by day and by month. This way you will
have also the desired window boundaries. This would look like:
> keyedStream.timeWindow(byMonth).addSink …
> keyedStream.timeWindow(byDay).addSink …
> keyedStream.timeWindow(byMinute).addSink …
> Please let us know if this answers your question and if you need any more help.
> Kostas
>> On Aug 10, 2016, at 10:15 PM, Shannon Carey <scarey@expedia.com <mailto:scarey@expedia.com>>
>> Hi Aljoscha,
>> Yes, I am using an Evictor, and I think I have seen the problem you are referring
to. However, that's not what I'm talking about.
>> If you re-read my first email, the main point is the following: if users desire updates
more frequently than window watermarks are reached, then window state behaves suboptimally.
It doesn't matter if there's an evictor or not. Specifically:
>> If I have a windows "A" that I fire multiple times in order to provide incremental
results as data comes in instead of waiting for the watermark to purge the window
>> And that window's events are gathered into another, bigger window "B"
>> And I want to keep only the latest event from each upstream window "A" (by timestamp,
where each window pane has its own timestamp)
>> Even if I have a fold/reduce method on the bigger window "B" to make sure that each
updated event from "A" overwrites the previous event (by timestamp)
>> Window "B" will hold in state all events from windows "A", including all the incremental
events that were fired by processing-time triggers, even though I don't actually need those
events because the reducer gets rid of them
>> An example description of execution flow:
>> Event x
>> Window A receives event, trigger waits for processing time delay, then emits event
x(time=1, count=1)
>> Window B receives event, trigger waits for processing time delay, then executes fold()
and emits event(time=1 => count=1), but internal Window state looks like [x(time=1, count=1)]
>> Event y
>> Window A receives event, trigger '', then emits event y(time=1, count=2)
>> Window B receives event, trigger '', then executes fold() and emits event(time=1
=> count=2), but internal Window state looks like [x(time=1, count=1), y(time=1, count=2)]
>> Watermark z
>> Window A receives watermark, trigger's event timer is reached, fires and purges and
emits current state as event z(time=1, count=2)
>> Window B receives event, trigger waits for processing time delay, then executes fold()
and emits event(time=1 => count=2), but internal Window state looks like [x(time=1, count=1),
y(time=1, count=2), z(time=1, count=2)]
>> As you can see, the internal window state continues to grow despite what fold() is
>> Does that explanation help interpret my original email?
>> -Shannon
>> From: Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
>> Date: Wednesday, August 10, 2016 at 12:18 PM
>> To: "user@flink.apache.org <mailto:user@flink.apache.org>" <user@flink.apache.org
>> Subject: Re: Firing windows multiple times
>> Hi,
>> from your mail I'm gathering that you are in fact using an Evictor, is that correct?
If not, then the window operator should not keep all the elements ever received for a window
but only the aggregated result.
>> Side note, there seems to be a bug in EvictingWindowOperator that causes evicted
elements to not actually be removed from the state. They are only filtered from the Iterable
that is given to the WindowFunction. I opened a Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369
>> Cheers,
>> Aljoscha
>> On Wed, 10 Aug 2016 at 18:19 Shannon Carey <scarey@expedia.com <mailto:scarey@expedia.com>>
>> One unfortunate aspect of using a fold() instead of a window is that the fold function
has no knowledge of the watermarks. As a result, it is difficult to ensure that only items
before the current watermark are included in the aggregation, and that old items are evicted
correctly. This fact lends more support to the idea of using a custom operator (though that
is more complex) or adding support for this use case within Flink.
>> -Shannon

View raw message