flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger
Date Wed, 23 Mar 2016 09:24:49 GMT
Hi,
the output at 19:44:44.635 is indeed strange. Is this reproducible?

As for the removal of windows. That is a pitfall a lot of users have fallen into. The timeWindowAll()
call just sets up a window assigner, so in your case the equivalent call would be:

     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
     .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference is
here
     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
     .fold(Set[String]()){(r,i) => { r + i}}

The window assigners itself does not do any cleanup or triggering of window processing. It
does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows.
This trigger fill fire once at the end of a window and then also purge the window contents.
By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does
not clean up (purge) windows.

This is something that seems to happen for a lot of people, I therefore started an initiative
to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@apache.org%3e

I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

What do you think?

Cheers,
Aljoscha
> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
> 
> Aljoscha,
> 
> Thank you for fixing the issue.
> I built both Flink server and job with the code you provided, and it
> worked as almost expected.
> The output was below. I am wondering why the value emitted at
> 19:44:44.635 while I set
> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
> problem for me.
> 
> ---
> (2016-03-22 19:44:35.002,1)
> (2016-03-22 19:44:44.635,2)
> (2016-03-22 19:44:45.001,2)
> (2016-03-22 19:45:45.001,1)
> ---
> 
> And regarding the removal from the window, you mean the data remains
> in the window even if
> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
> I thought that ContinuousProcessingTimeTrigger works on top of
> timeWindowAll and timeWindowAll
> take care of purging data from the window.
> 
> ---
> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> ---
> 
> Regards,
> Hironori
> 
> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
>> Hi,
>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer
is not correctly set. You can try it with this fixed version, that I will also update in the
Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>> 
>> One more thing, the ContinuousProcessingTimeTrigger will never remove the window.
The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger
will only ever fire for a window. This means that you will have a lot of windows hanging around
in your state at some points and they will never be cleaned up. For now, if you require the
behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger
based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time
and also has the continuous triggering at earlier times.
>> 
>> Let us know if you need more information about this. Kostas Kloudas also recently
looked into writing custom Triggers, so maybe he has some material he could give to you.
>> 
>> Cheers,
>> Aljoscha
>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
>>> 
>>> Hello,
>>> 
>>> I have a question about TumblingProcessingTimeWindow and
>>> ContinuousProcessingTimeTrigger.
>>> 
>>> The code I tried is below. Output the distinct count of the words,
>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>> 
>>> ---
>>>   val input =
>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>> 
>>>   input print
>>> ---
>>> 
>>> I wrote data to the input file with some interval.
>>> 
>>> ---
>>> echo "aaa" >> input.txt
>>> echo "aaa" >> input.txt
>>> sleep 10
>>> echo "bbb" >> input.txt
>>> sleep 60
>>> echo "ccc" >> input.txt
>>> ---
>>> 
>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>> sec later) 2 -> (60+ sec later) 1 .
>>> ---
>>> (2016-03-18 13:08:59.288,2)
>>> ---
>>> 
>>> Even after several minutes, I never got additional record. In my
>>> understanding, with
>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>> operator (fold, map) in the code above will be evaluated every 5
>>> seconds.
>>> Am I mis-understand something?
>>> 
>>> Regards,
>>> Hironori
>> 


Mime
View raw message