flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Doubt about window and count trigger
Date Fri, 27 Nov 2015 14:31:56 GMT
Hi Anwar,

You trigger looks good!

I just want to make sure you know what it is exactly happening after a
window was evaluated and purged.
Once a window was purged, the whole window is cleared and removed. If a new
element arrives, that would have fit into the purged window, a new window
with exactly the same time boundaries is created, i.e., if you have a 5 min
time window, that is fired and purged in minute 4 and a new element arrived
immediately after the purging, it is put into a window, that will only
"exist" for 1 more minute (and not starting a new 5 minute window).

Cheers, Fabian


2015-11-27 14:59 GMT+01:00 Anwar Rizal <anrizal05@gmail.com>:

> Thanks Fabian and Aljoscha,
>
> I try to implement the trigger as you described as follow:
>
> https://gist.github.com/anonymous/d0578a4d27768a75bea4
> <https://gist.github.com/anonymous/d0578a4d27768a75bea4>
>
> It works fine , indeed.
>
> Thanks,
> Anwar
>
>
> On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi Anwar,
>> what Fabian wrote is completely right. I just want to give the reasoning
>> for why the CountTrigger behaves as it does. The idea was to have Triggers
>> that clearly focus on one thing and then at some point add combination
>> triggers. For example, an OrTrigger that triggers if either of it’s sub
>> triggers triggers, or an AndTrigger that triggers after both its sub
>> triggers fire. (There is also more complex stuff that could be thought of
>> here.)
>>
>> Cheers,
>> Aljoscha
>> > On 27 Nov 2015, at 09:59, fhueske@gmail.com wrote:
>> >
>> >
>> > Hi,
>> >
>> > a regular tumbling time window of 5 seconds gets all elements within
>> that period of time (semantics of time varies for processing, ingestion,
>> and event time modes) and triggers the execution after 5 seconds.
>> >
>> > If you define a custom trigger, the assignment policy remains the same,
>> but the trigger condition is overwritten (it is NOT additional but replaces
>> the default condition), i.e., in your implementation, it will only trigger
>> when 100 elements arrived. In order to trigger also when the window time
>> expires, you have to register a timer (processing time or event time timer)
>> via the trigger context.
>> > NOTE: The window assigner will continue to assign elements to the
>> window, even if the window was already evaluated. If you PURGE the window
>> and an element arrives after that, a new window is created.
>> >
>> > To implement your trigger, you have to register a timer in the
>> onEvent() method with:
>> > ctx.registerEventTimeTimer(window.getEnd)
>> > You can to that in every onEvent() call, because the timer is always
>> overwritten.
>> >
>> > NOTE: you should use Flink’s keyed-state (access via triggerContext) if
>> you want to keep state such as the current count.
>> >
>> > Hope this helps. Please let me know if you have further questions.
>> > Fabian
>> >
>> >
>> >
>> >
>> > From: Matthias J. Sax
>> > Sent: Friday, November 27, 2015 08:44
>> > To: user@flink.apache.org
>> > Subject: Re: Doubt about window and count trigger
>> >
>> >
>> > Hi,
>> >
>> > a Trigger is an *additional* condition for intermediate (early)
>> > evaluation of the window. Thus, it is not "or-ed" to the basic window
>> > definition.
>> >
>> > If you want to have an or-ed window condition, you can customize it by
>> > specifying your own window definition.
>> >
>> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put
>> your code here */ );
>> >
>> > -Matthias
>> >
>> >
>> > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
>> > > Hi all,
>> > >
>> > > From the documentation:
>> > > "The |Trigger| specifies when the function that comes after the window
>> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
>> > >
>> > > So, basically, if I specify:
>> > >
>> > > |keyedStream
>> > >     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
>> > >     .trigger(CountTrigger.of(100))|
>> > >
>> > > |
>> > > |
>> > >
>> > > |The execution of the window function is triggered when the count
>> reaches 100 in the time window of 5 seconds. If you have a system that
>> never reaches 100 in 5 seconds, basically you will never have the window
>> fired.|
>> > >
>> > > |
>> > > |
>> > >
>> > > |My question is, what would be the best option to have behavior as
>> follow:|
>> > >
>> > > |The execution of the window function is triggered when 5 seconds is
>> reached or 100 events are received before 5 seconds.|
>> > >
>> > >
>> > > I think of implementing my own trigger that looks like CountTrigger,
>> but that will fire also when the end of time window is reached (at the
>> moment, it just returns Continue, instead of Fired). But maybe there's a
>> better way ?
>> > >
>> > > Is there a reason why CountTrigger is implemented as it is
>> implemented today, and not as I described above (5 seconds or 100 events
>> reached, whichever comes first).
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Anwar.
>> > >
>>
>>
>

Mime
View raw message