flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: global window trigger
Date Thu, 20 Jul 2017 14:15:38 GMT
Hi,

Yes, you can have state in a WindowFunction if you use Flink’s state abstraction that you
can access from a RichWindowFunction using the RuntimeContext. (Or by using a ProcessWindowFunction).

Trigger purging behaviour makes a difference if the Trigger fires repeatedly before the watermark
reaches the end of the window. For example a trigger that speculatively fires early. In those
cases it can make sense to make a distinction between purging and firing and just firing,
depending on whether you want all accumulated window contents or only those elements that
have accumulated since the last trigger firing.

GlobalWindows is not implemented by setting allowed lateness very high, it is a WindowAssigner
that assigns Long.MAX_VALUE to the max window timestamp, the watermark will therefore never
pass the end of that GlobalWindow.

Regarding your use case: since you want to keep all data since the start I would suggest to
use GlobalWindows, a custom Trigger that periodically fires and a ProcessWindwoFunction. In
the ProcessWindowFunction you can make sure to only process those elements that you want to
process based on their timestamp and the current event time, which you can access from a ProcessWindowFunction.

If you don’t want to keep all events indefinitely (which could eventually blow up your state
size) you can use an Evictor to sometimes evict certain events from the window buffers.

Best,
Aljoscha

> On 20. Jul 2017, at 12:24, jad mad <jadmad0828@gmail.com> wrote:
> 
> Hello Aljoscha,
> 
> > I’m afraid this will not work well because a WindowAssigner should be stateless
> ok, now understand this.
> How about inside a custom WindowFunction(...), a bad idea to have states as well?
> 
> the default trigger for EventTimeTumblingWindow is the EventTimeTrigger(...).
> looking at the definition file, there are a few return TriggerResult.FIRE; but not a
single PURGE.
> even so, each time the contents get cleared the time passes a window end.
> Is this what you meant by 
> >When the watermark passes the end of a window plus the allowed lateness the window
contents are being purged. 
> ?
> if yes, return TriggerResult.FIRE; or return TriggerResult.FIRE_AND_PURGE
> seems less important for a trigger implementation because the contents will be cleared
any way
> and the lateness amount is more important?
> And is this how a GlobalWindows implemented by setting the "lateness" to a huge number

> so that it keeps all things in it?
> 
> so, back to my original question.
> in order to keep everything from start like a GlobalWindows, let it fire periodically
and 
> then perform some calcs, what combination of window assigner, trigger, and/or custom
window function
> I may use?  better if there'd be a simple working sample.
> 
> thank you a lot!
> jad
> 
> On Thu, Jul 20, 2017 at 5:45 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi,
> 
> I’m afraid this will not work well because a WindowAssigner should be stateless, i.e.
it should not keep any state in fields. The reason is that there can be several WindowAssigners
used on the different partitions and the order in which a WindowAssigner sees the incoming
elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp”
field that is not chronologically the “first timestamp”.
> 
> The reason for your windows being purged is probably the allowed lateness, which is zero
by default. When the watermark passes the end of a window plus the allowed lateness the window
contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness().
You should be careful, though, because of you set this too high you might never clean up your
window state and therefore have ever growing state.
> 
> Best,
> Aljoscha
> 
>> On 18. Jul 2017, at 15:05, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>> 
>> Aljoscha,
>> 
>> what a great answer and this is what I'd expected!
>> 
>> as a workaround I've modified the EventTimeSlidingWindow a little bit to a custom
WindowAssigner like below : 
>> the a few differences are 
>> 1.storing the first timestamp in a variable "first_timestamp", 
>> 2.used this time stamp as the any following windows' start time.
>> @PublicEvolving
>> public class MySlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
{
>>     private static final long serialVersionUID = 1L;
>>     private final long size;
>>     private final long slide;
>>     private final long offset;
>>     private long first_timestamp = -1L; // added by me!
>> 
>>     protected MySlidingEventTimeWindows(long size, long slide, long offset) {
>>         if(offset >= 0L && offset < slide && size > 0L)
{
>>             this.size = size;
>>             this.slide = slide;
>>             this.offset = offset;
>>         } else {
>>             throw new IllegalArgumentException("SlidingEventTimeWindows parameters
must satisfy 0 <= offset < slide and size > 0");
>>         }
>>     }
>> 
>>     public Collection<TimeWindow> assignWindows(Object element, long timestamp,
WindowAssignerContext context) {
>>         if(timestamp <= -9223372036854775808L) {
>>             throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to
call 'DataStream.assignTimestampsAndWatermarks(...)'?");
>>         } else {
>>             if(this.first_timestamp == -1L) {// added by me!
>>                 this.first_timestamp = timestamp;
>>                 System.out.println("===================== " + this.first_timestamp
+ " ========================");
>>             }
>>             List<TimeWindow> windows = new ArrayList((int)(this.size / this.slide));
>>             long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset,
this.slide);
>> 
>>             for(long start = lastStart; start > timestamp - this.size; start -=
this.slide) {
>>                 //windows.add(new TimeWindow(start, start + this.size)); // original
implementation
>>                 windows.add(new TimeWindow(this.first_timestamp, start + this.size));
// modified by me!
>>             }
>>             return windows;
>>         }
>>     }
>> the result I get from MyWindowFunction(...) is like below : 
>> 2017-01-01 00:17:39	2017-01-01 00:00:01	2
>> 2017-01-01 00:17:39	2017-01-01 00:00:02	4
>> 2017-01-01 00:17:39	2017-01-01 00:00:03	4
>> 2017-01-01 00:17:39	2017-01-01 00:00:04	10
>> 2017-01-01 00:17:39	2017-01-01 00:00:05	19
>> 2017-01-01 00:17:39	2017-01-01 00:00:06	19
>> 2017-01-01 00:17:39	2017-01-01 00:00:07	20
>> 2017-01-01 00:17:39	2017-01-01 00:00:08	23
>> 2017-01-01 00:17:39	2017-01-01 00:00:09	21
>> 2017-01-01 00:17:39	2017-01-01 00:00:10	7
>> 2017-01-01 00:17:39	2017-01-01 00:00:11	2
>> 2017-01-01 00:17:39	2017-01-01 00:00:12	5
>> 2017-01-01 00:17:39	2017-01-01 00:00:13	12
>> 2017-01-01 00:17:39	2017-01-01 00:00:14	17
>> 2017-01-01 00:17:39	2017-01-01 00:00:15	9
>> 2017-01-01 00:17:39	2017-01-01 00:00:16	8
>> 
>> things I don't seem to understand are 
>> 1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 2017-01-01
00:17:39 shown up in my result as 
>>      each sliding window's start time?
>>     basically, I'm just printing out the time stamp came with the first iterable
object's element in MyWindowFunction.
>> 2. I made MyWindowAssigner in a hope that the starting time is fixed and the contents
not being purged.
>>     however, from the results, we can see it works just as a normal EventTimeSlidingWindow
with contents
>>     been purged.
>>     How can I make it not to throw away its window contents even after each time
firing.
>> 3. this MyWindowAssigner(...) attempt arose as an effort based on your previous advice
using a 
>>     different WindowFunction. wonder if I'm heading to the right direction or not.
>>     
>> thank you very much!
>> jad
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark
T says that there will not be elements with a timestamp <= T in the future. It does not
say, that there have not yet been elements with a timestamp > T. In your specific case,
this means that there will be elements in the GlobalWindow that have a timestamp that is after
the firing timestamp of your trigger. If you want to make sure that windows are somehow put
into buckets, based on their timestamp then you need to use a different WindowFunction, because
GlobalWindows simply puts every element into the same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that from within
a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the elements
of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start to
the end rather than
>>> from start to the timing(event time timestamp) where each time trigger fires.
>>> have been worked on this for a week now but not being able to find any solution
yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count
the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String,
String>, Tuple3<String, String, String>, Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>>
iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical
firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always
returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that
data is never cleared away when using GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;"

>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org
<mailto:aljoscha@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if the
watermark passes the garbage collection horizon for a given window. With GlobalWindows, the
GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in
a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass>
iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com
<mailto:prashant@intellifylearning.com>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event
time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive
at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark
T says that there will not be elements with a timestamp <= T in the future. It does not
say, that there have not yet been elements with a timestamp > T. In your specific case,
this means that there will be elements in the GlobalWindow that have a timestamp that is after
the firing timestamp of your trigger. If you want to make sure that windows are somehow put
into buckets, based on their timestamp then you need to use a different WindowFunction, because
GlobalWindows simply puts every element into the same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that from within
a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the elements
of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start to
the end rather than
>>> from start to the timing(event time timestamp) where each time trigger fires.
>>> have been worked on this for a week now but not being able to find any solution
yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count
the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String,
String>, Tuple3<String, String, String>, Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>>
iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical
firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always
returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that
data is never cleared away when using GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;"

>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org
<mailto:aljoscha@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if the
watermark passes the garbage collection horizon for a given window. With GlobalWindows, the
GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in
a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <mailto:jadmad0828@gmail.com>>
wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass>
iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com
<mailto:prashant@intellifylearning.com>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event
time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive
at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Mime
View raw message