flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Carey <sca...@expedia.com>
Subject Re: Firing windows multiple times
Date Fri, 02 Sep 2016 17:44:32 GMT
Of course! I really appreciate your interest & attention. I hope we will figure out solutions
that other people can use.

I agree with your analysis. Your triggering syntax is particularly nice. I wrote a custom
trigger which does exactly that but without the nice fluent API. As I considered the approach
you mentioned, it was clear that I would not be able to easily solve the problem of multiple
windows with early-firing events causing over-counting. Modifying the windowing system as
you describe would be helpful. Events could either be filtered out, as you describe, or perhaps
the windows themselves could be muted/un-muted depending on whether they are the closest window
(by end time) to the current watermark.

I'm not clear on the purpose of the late firing you describe. I believe that was added in
Flink 1.1 and it's a new concept to me. I thought late events were completely handled by decisions
made in the watermark & timestamp assigner. Does this feature allow events after the watermark
to still be incorporated into windows that have already been closed by a watermark? Perhaps
it's intended to allow window-specific lateness allowance, rather than the stream-global watermarker?
That does sound problematic. I assume there's a reason for closing the window before the allowed
lateness has elapsed? Otherwise, the window (trigger, really) could just add the lateness
to the watermark and pretend that the watermark hadn't been reached until the lateness had
already passed.

I agree that your idea is potentially a lot better than the approach I described, if it can
be implemented! You are right that the approach I described requires that all the events be
retained in the window state so that aggregation can be done repeatedly from the raw events
as new events come in and old events are evicted. In practice, we are currently writing the
first aggregations (day-level) to an external database and then querying that time-series
from the second-level (year) aggregation so that we don't actually need to keep all that data
around in Flink state. Obviously, that approach can have an impact on the processing guarantees
when a failure/recovery occurs if we don't do it carefully. Also, we're not particularly sophisticated
yet with regard to avoiding unnecessary queries to the time series data.


From: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>
Date: Friday, September 2, 2016 at 4:02 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

I see, I didn't forget about this, it's just that I'm thinking hard.

I think in your case (which I imagine some other people to also have) we would need an addition
to the windowing system that the original Google Dataflow paper called retractions. The problem
is best explained with an example. Say you have this program:

DataStream input = ...

DataStream firstAggregate = input
  .window(TumblingTimeWindow(1 Day))
  .reduce(new SomeAggregate())

DataStream secondAggregate = firstAggregate
  .window(TumblingTimeWindow(5 Days)
  .reduce(new SomeAggregate())

The problem here is that the second windowing operation sees all the incremental early-firing
updates from the first window operation, it would thus over count. This problem could be overcome
by introducing meta data in the windowing system and filtering out those results that indicate
that they come from an early (speculative) firing. A second problem is that of late firings,
i.e. if you have a window specification like this:

DataStream firstAggregate = input
  .window(TumblingTimeWindow(1 Day))
  .allowedLateness(1 Hour)
  .reduce(new SomeAggregate())

where you also have late firing data after you got the primary firing when the watermark passed
the end of the window. That's were retractions come into play, before sending data downstream
form a late firing the window operator has to send the inverse of the previous firing so that
the downstream operation can "subtract" that from the current aggregate and replace it with
the newly updated aggregate. This is a somewhat thorny problem, though, and to the best of
my knowledge Google never implemented this in the publicly available Dataflow SDK or what
is now Beam.

The reason why I'm thinking in this direction and not in the direction of keeping track of
the watermark and manually evicting elements as you go is that I think that this approach
would be more memory efficient and easier to understand. I don't understand yet how a single
window computation could keep track of aggregates for differently sized time windows and evict
the correct elements without keeping all the elements in some store. Maybe you could shed
some light on this? I'd be happy if there was a simple solution for this. :-)


On Tue, 30 Aug 2016 at 23:49 Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
I appreciate your suggestion!

However, the main problem with your approach is the amount of time that goes by without an
updated value from minuteAggregate and hourlyAggregate (lack of a continuously updated aggregate).

For example, if we use a tumbling window of 1 month duration, then we only get an update for
that value once a month! The values from that stream will be on average 0.5 months stale.
A year-long window is even worse.


From: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>
Date: Tuesday, August 30, 2016 at 9:08 AM
To: Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>"

Subject: Re: Firing windows multiple times

I think this can be neatly expressed by using something like a tree of windowed aggregations,
i.e. you specify your smallest window computation first and then specify larger window computations
based smaller windows. I've written an example that showcases this approach: https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd

The basic idea in pseudo code is this:

DataStream input = ...
dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
weeklyAggregate = dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new Sum())
monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new Sum())

the benefit of this approach is that you don't duplicate computation and that you can have
incremental aggregation using a reduce function. When manually keeping elements and evicting
them based on time the amount of state that would have to be kept would be much larger.

Does that make sense and would it help your use case?


On Mon, 29 Aug 2016 at 23:18 Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
Yes, let me describe an example use-case that I'm trying to implement efficiently within Flink.

We've been asked to aggregate per-user data on a daily level, and from there produce aggregates
on a variety of time frames. For example, 7 days, 30 days, 180 days, and 365 days.

We can talk about the hardest one, the 365 day window, with the knowledge that adding the
other time windows magnifies the problem.

I can easily use tumbling time windows of 1-day size for the first aggregation. However, for
the longer aggregation, if I take the naive approach and use a sliding window, the window
size would be 365 days and the slide would be one day. If a user comes back every day, I run
the risk of magnifying the size of the data by up to 365 because each day of data will be
included in up to 365 year-long window panes. Also, if I want to fire the aggregate information
more rapidly than once a day, then I have to worry about getting 365 different windows fired
at the same time & trying to figure out which one to pay attention to, or coming up with
a hare-brained custom firing trigger. We tried emitting each day-aggregate into a time series
database and doing the final 365 day aggregation as a query, but that was more complicated
than we wanted: in particular we'd like to have all the logic in the Flink job not split across
different technology & infrastructure.

The work-around I'm thinking of is to use a single window that contains 365 days of data (relative
to the current watermark) on an ongoing basis. The windowing function would be responsible
for evicting old data based on the current watermark.

Does that make sense? Does it seem logical, or am I misunderstanding something about how Flink


From: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>
Date: Monday, August 29, 2016 at 3:56 AM

To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

that would certainly be possible? What do you think can be gained by having knowledge about
the current watermark in the WindowFunction, in a specific case, possibly?


On Wed, 24 Aug 2016 at 23:21 Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
What do you think about adding the current watermark to the window function metadata in FLIP-2?

From: Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>, "user@flink.apache.org<mailto:user@flink.apache.org>"

Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, especially
to enable flexible approaches for eviction. In particular, having the current watermark available
to the evictor via EvictorContext is helpful: it will be able to evict the old data more easily
without needing to rely on Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would still not be possible
for the window function to choose which items to aggregate based on the current watermark.
In particular, it is desirable to be able to aggregate only the items below the watermark,
omitting items which have come in with timestamps larger than the watermark. Does that make


From: Aljoscha Krettek <aljoscha@apache.org<mailto:aljoscha@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

there is already this FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
which also links to a mailing list discussion. And this FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
The former proposes to enhance the Evictor API a bit, among other things we propose to give
the evictor access to the current watermark. The other FLIP proposes to extend the amount
of meta-data we give to the window function. The first to things we propose to add is a "firing
reason" that would tell you whether this was an early firing, an on time firing or a late
firing. The second thing is a firing counter that would tell you how many times the trigger
has fired so far for the current window.

Would a combination of these help with your use case?


On Thu, 11 Aug 2016 at 19:19 Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
"If Window B is a Folding Window and does not have an evictor then it should not keep the
list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present when using EvictingWindowOperator,
not when using WindowOperator. I misread line 382 of WindowOperator which calls windowState.add():
in actuality, the windowState is a FoldingState which incorporates the user-provided fold
function in order to eagerly fold the data. In contrast, if you use an evictor, EvictingWindowOperator
has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a short time after
a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so that the intermediate
events are not retained in an EvictingWindowOperator's state, and perform any necessary eviction
within my fold function. This has the aforementioned drawbacks of the windowed fold function
not knowing about watermarks, and therefore it is difficult to be precise about choosing which
items to evict. However, this seems to be the best choice within the current framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks either. When
a window emits an event, regardless of how it was fired, it is assigned the timestamp given
by its window's maxTimestamp(), which might be much greater than the processing time that
actually fired the event. Then, TimeEvictor compares the max timestamp of all items in the
window against the other ones in order to determine which ones to evict. Basically, it assumes
that the events were emitted due to the window terminating with FIRE_AND_PURGE. What if we
gave more information (specifically, the current watermark) to the evictor in order to allow
it to deal with a mix of intermediate events (fired by processing time) and final events (fired
by event time when the watermark reaches the window)? That value is already available in the
WindowOperator & could be passed to the Evictor very easily. It would be an API change,
of course.

Other than that, is it worth considering a change to EvictingWindowOperator to allow user-supplied
functions to reduce the size of its state when people fire upstream windows repeatedly? From
what I see when I monitor the state with debugger print statements, the EvictingWindowOperator
is definitely holding on to all the elements ever received, not just the aggregated result.
You can see this clearly because EvictingWindowOperator holds a ListState instead of a FoldingState.
The user-provided fold function is only applied upon fire().


View raw message