flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "William Saar" <will...@saar.se>
Subject Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
Date Fri, 25 Nov 2016 07:40:21 GMT
Thanks a lot! Your patched window at least causes events to be fired
when the window expires, but I think I may be better off with a custom
trigger that always fires when a new event arrives in the window (I
figured out how to write one of those).

One more question about grouping streams from different Kafka sources.
Can I skip windowing the separate streams before grouping them and
just union the two streams without windows and only window the
resulting union stream? or will that cause events from the different
sources to end up out of order with each other in the union stream and
get dropped when they arrive at the window? Anything special I should
do/avoid to join events from different Kafka sources to make Flink
process them in the order of their event times and prevent events from
getting dropped (even if one source has a lot more events from many
more days and is slower to read than the other etc.)?

----- Original Message -----
From: user@flink.apache.org
To:"user@flink.apache.org" <user@flink.apache.org>
Cc:
Sent:Wed, 23 Nov 2016 15:53:27 +0100
Subject:Re: ContinuousEventTimeTrigger breaks coGrouped windowed
streams?

 The problem here is that the ContiuousEventTimeTrigger is kind of
 broken. It relies on the first element to trigger a future timer but
 the time might not progress this far. It should additionally trigger
 at the end of the window.

 Here is a version with an improved continuous trigger:
 https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2

 By the way, if you remove the ContinuousEventTimeTrigger (which will
 implicitly set a regular EventTimeTrigger) for the CoGroup, it also
 works fine. I don't know whether you really want early firings there.

 Cheers,
 Max

 PS: Final word on the cleanup. The state should always be cleaned up
 at the end of the window + allowedLateness you have set.

 On Tue, Nov 22, 2016 at 11:08 PM, William Saar <william@saar.se>
wrote:
 > Thanks!
 > One difference is that my topology had 2 sources. I have updated
your
 > example to also use 2 sources and that breaks the co-group
operation in the
 > example as well!
 >
 > https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed
 >
 > Nice to know that purging can be added to the event trigger.
 >
 > William
 >
 >
 > ----- Original Message -----
 > From:
 > user@flink.apache.org
 >
 > To:
 > "user@flink.apache.org" <user@flink.apache.org>
 > Cc:
 >
 > Sent:
 > Tue, 22 Nov 2016 11:50:52 +0100
 >
 > Subject:
 > Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
 >
 >
 > Hi William,
 >
 > I've reproduced your example locally for some toy data and
everything
 > was working as expected (with the early triggering). So I'm
assuming
 > either there is something wrong with your input data or the
behavior
 > doesn't always manifest.
 >
 > Here's the example I run in case you want to try:
 > https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405
 >
 > Gyula is right, the ContinuousEventTimeTrigger never purges the
window
 > but that you can circumvent that by extending this trigger and
purging
 > at the end of the window, similarly as done in the
EventTimeTrigger.
 >
 > -Max
 >
 >
 > On Mon, Nov 21, 2016 at 6:52 PM, William Saar <william@saar.se>
wrote:
 >> Thanks!
 >>
 >> Yes, the SlidingEventTimeWindow works, but is there any way to
 >> pre-aggregate
 >> things with tumbling windows that emit events more often than
their window
 >> size? Perhaps I can do this before I merge the streams? (But if
 >> ContinuousEventTimeTrigger is the only way to do that, it's bad if
it
 >> doesn't clean up its state).
 >>
 >> I assume using sliding window states will be too large and less
efficient
 >> than tumbling windows as a sliding fold needs to keep all events
in the
 >> window and recompute the fold as events slide out of the window,
while a
 >> tumbling fold just needs to keep the aggregation and can discard
events as
 >> it folds them.
 >>
 >> I am reviewing how one would replace a batch solution based on 3
bucketed
 >> aggregations of different window sizes and it seems tumbling
windows would
 >> be a perfect fit and would need to keep only the 3 aggregations a
memory,
 >> while sliding windows would need to keep up to 3 copies of all
events (for
 >> at least the smallest window size) to compute the same type of
results.
 >>
 >> Hälsningar!
 >> William
 >>
 >>
 >> ----- Original Message -----
 >> From:
 >> user@flink.apache.org
 >>
 >> To:
 >> <user@flink.apache.org>
 >> Cc:
 >>
 >> Sent:
 >> Mon, 21 Nov 2016 08:22:16 +0000
 >> Subject:
 >> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
 >>
 >>
 >>
 >> Hi William,
 >>
 >> I am wondering whether the ContinuousEventTimeTrigger is the best
choice
 >> here (it never cleans up the state as far as I know).
 >>
 >> Have you tried the simple SlidingEventTimeWindows as your window
function?
 >>
 >> Cheers,
 >> Gyula
 >>
 >> William Saar <william@saar.se> ezt írta (időpont: 2016. nov.
19., Szo,
 >> 18:28):
 >>>
 >>> Hi!
 >>>
 >>> My topology below seems to work when I comment out all the lines
with
 >>> ContinuousEventTimeTrigger, but prints nothing when the line is
in there.
 >>> Can I coGroup two large time windows that use a different trigger
time
 >>> than
 >>> the window size? (even if the ContinuousEventTimeTrigger doesn't
work for
 >>> coGroups, I would not expect the result to be completely silent).
 >>>
 >>> The streams I'm cogroupng are from 2 different Kafka sources and
uses
 >>> event time with 0 out of orderness and I'm on Flink 1.1.3, if
that helps
 >>>
 >>> DataStream<CommonType> stream1 =
 >>> <stream of event type1>
 >>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
 >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
 >>> .fold(...);
 >>>
 >>>
 >>> DataStream<CommonType> stream2 =
 >>> <stream of event type2>
 >>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
 >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
 >>> .fold(...);
 >>>
 >>>
 >>>
 >>> stream1.coGroup(stream2).where(...).equalTo(...)
 >>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
 >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
 >>> .print()
 >>>
 >>> Thanks,
 >>>
 >>> William
 >>>
 >>


Mime
View raw message