flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger
Date Fri, 25 Mar 2016 15:17:44 GMT
Hello Aljoscha and Hironori,

Nice initiative! I totally agree with the proposals in the document. 
I also left some comments and I will soon start working on some 
of the issues there.

Kostas

> On Mar 25, 2016, at 12:53 PM, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
> 
> Aljoscha,
> 
> Yes, it's reproducible as long as I tried. Here is the code and
> procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a
> 
> Thank you for your explanation about the removal of windows. I didn't
> know that calling .trigger() will replace default
> window trigger.
> I have read your document, I personally think the current .window,
> .trigger, .evictor APIs are flexible enough if they
> work as expected. ( calling .trigger does not replace the
> windowAssigner's default trigger/evictor, but just add additional
> trigger). Although I am new to Flink and may not understand the
> problem correctly.
> As for "Testability and Test Coverage" section, I think it's good idea
> to use internal clock instead of System clock.
> It will make it easy to test my streaming jobs.
> 
> Regards,
> Hironori
> 
> 2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
>> Hi,
>> the output at 19:44:44.635 is indeed strange. Is this reproducible?
>> 
>> As for the removal of windows. That is a pitfall a lot of users have fallen into.
The timeWindowAll() call just sets up a window assigner, so in your case the equivalent call
would be:
>> 
>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>     .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference
is here
>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>     .fold(Set[String]()){(r,i) => { r + i}}
>> 
>> The window assigners itself does not do any cleanup or triggering of window processing.
It does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows.
This trigger fill fire once at the end of a window and then also purge the window contents.
By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does
not clean up (purge) windows.
>> 
>> This is something that seems to happen for a lot of people, I therefore started an
initiative to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@apache.org%3e
>> 
>> I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
>> 
>> What do you think?
>> 
>> Cheers,
>> Aljoscha
>>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
>>> 
>>> Aljoscha,
>>> 
>>> Thank you for fixing the issue.
>>> I built both Flink server and job with the code you provided, and it
>>> worked as almost expected.
>>> The output was below. I am wondering why the value emitted at
>>> 19:44:44.635 while I set
>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
>>> problem for me.
>>> 
>>> ---
>>> (2016-03-22 19:44:35.002,1)
>>> (2016-03-22 19:44:44.635,2)
>>> (2016-03-22 19:44:45.001,2)
>>> (2016-03-22 19:45:45.001,1)
>>> ---
>>> 
>>> And regarding the removal from the window, you mean the data remains
>>> in the window even if
>>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
>>> I thought that ContinuousProcessingTimeTrigger works on top of
>>> timeWindowAll and timeWindowAll
>>> take care of purging data from the window.
>>> 
>>> ---
>>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>> ---
>>> 
>>> Regards,
>>> Hironori
>>> 
>>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>> Hi,
>>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger.
The timer is not correctly set. You can try it with this fixed version, that I will also update
in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>>> 
>>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the
window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger
will only ever fire for a window. This means that you will have a lot of windows hanging around
in your state at some points and they will never be cleaned up. For now, if you require the
behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger
based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time
and also has the continuous triggering at earlier times.
>>>> 
>>>> Let us know if you need more information about this. Kostas Kloudas also
recently looked into writing custom Triggers, so maybe he has some material he could give
to you.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibayashi@gmail.com>
wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> I have a question about TumblingProcessingTimeWindow and
>>>>> ContinuousProcessingTimeTrigger.
>>>>> 
>>>>> The code I tried is below. Output the distinct count of the words,
>>>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>>> 
>>>>> ---
>>>>>  val input =
>>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>>>    .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>>>    .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>>>    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>>>    .fold(Set[String]()){(r,i) => { r + i}}
>>>>>    .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>>> 
>>>>>  input print
>>>>> ---
>>>>> 
>>>>> I wrote data to the input file with some interval.
>>>>> 
>>>>> ---
>>>>> echo "aaa" >> input.txt
>>>>> echo "aaa" >> input.txt
>>>>> sleep 10
>>>>> echo "bbb" >> input.txt
>>>>> sleep 60
>>>>> echo "ccc" >> input.txt
>>>>> ---
>>>>> 
>>>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>>>> sec later) 2 -> (60+ sec later) 1 .
>>>>> ---
>>>>> (2016-03-18 13:08:59.288,2)
>>>>> ---
>>>>> 
>>>>> Even after several minutes, I never got additional record. In my
>>>>> understanding, with
>>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>>>> operator (fold, map) in the code above will be evaluated every 5
>>>>> seconds.
>>>>> Am I mis-understand something?
>>>>> 
>>>>> Regards,
>>>>> Hironori
>>>> 
>> 


Mime
View raw message