flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hironori Ogibayashi <ogibaya...@gmail.com>
Subject Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger
Date Fri, 25 Mar 2016 11:53:45 GMT
Aljoscha,

Yes, it's reproducible as long as I tried. Here is the code and
procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a

Thank you for your explanation about the removal of windows. I didn't
know that calling .trigger() will replace default
window trigger.
I have read your document, I personally think the current .window,
.trigger, .evictor APIs are flexible enough if they
work as expected. ( calling .trigger does not replace the
windowAssigner's default trigger/evictor, but just add additional
trigger). Although I am new to Flink and may not understand the
problem correctly.
As for "Testability and Test Coverage" section, I think it's good idea
to use internal clock instead of System clock.
It will make it easy to test my streaming jobs.

Regards,
Hironori

2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
> Hi,
> the output at 19:44:44.635 is indeed strange. Is this reproducible?
>
> As for the removal of windows. That is a pitfall a lot of users have fallen into. The
timeWindowAll() call just sets up a window assigner, so in your case the equivalent call would
be:
>
>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference
is here
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      .fold(Set[String]()){(r,i) => { r + i}}
>
> The window assigners itself does not do any cleanup or triggering of window processing.
It does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows.
This trigger fill fire once at the end of a window and then also purge the window contents.
By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does
not clean up (purge) windows.
>
> This is something that seems to happen for a lot of people, I therefore started an initiative
to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@apache.org%3e
>
> I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
>
> What do you think?
>
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
>>
>> Aljoscha,
>>
>> Thank you for fixing the issue.
>> I built both Flink server and job with the code you provided, and it
>> worked as almost expected.
>> The output was below. I am wondering why the value emitted at
>> 19:44:44.635 while I set
>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
>> problem for me.
>>
>> ---
>> (2016-03-22 19:44:35.002,1)
>> (2016-03-22 19:44:44.635,2)
>> (2016-03-22 19:44:45.001,2)
>> (2016-03-22 19:45:45.001,1)
>> ---
>>
>> And regarding the removal from the window, you mean the data remains
>> in the window even if
>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
>> I thought that ContinuousProcessingTimeTrigger works on top of
>> timeWindowAll and timeWindowAll
>> take care of purging data from the window.
>>
>> ---
>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>> ---
>>
>> Regards,
>> Hironori
>>
>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
>>> Hi,
>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The
timer is not correctly set. You can try it with this fixed version, that I will also update
in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>>
>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the window.
The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger
will only ever fire for a window. This means that you will have a lot of windows hanging around
in your state at some points and they will never be cleaned up. For now, if you require the
behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger
based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time
and also has the continuous triggering at earlier times.
>>>
>>> Let us know if you need more information about this. Kostas Kloudas also recently
looked into writing custom Triggers, so maybe he has some material he could give to you.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibayashi@gmail.com>
wrote:
>>>>
>>>> Hello,
>>>>
>>>> I have a question about TumblingProcessingTimeWindow and
>>>> ContinuousProcessingTimeTrigger.
>>>>
>>>> The code I tried is below. Output the distinct count of the words,
>>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>>
>>>> ---
>>>>   val input =
>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>>
>>>>   input print
>>>> ---
>>>>
>>>> I wrote data to the input file with some interval.
>>>>
>>>> ---
>>>> echo "aaa" >> input.txt
>>>> echo "aaa" >> input.txt
>>>> sleep 10
>>>> echo "bbb" >> input.txt
>>>> sleep 60
>>>> echo "ccc" >> input.txt
>>>> ---
>>>>
>>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>>> sec later) 2 -> (60+ sec later) 1 .
>>>> ---
>>>> (2016-03-18 13:08:59.288,2)
>>>> ---
>>>>
>>>> Even after several minutes, I never got additional record. In my
>>>> understanding, with
>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>>> operator (fold, map) in the code above will be evaluated every 5
>>>> seconds.
>>>> Am I mis-understand something?
>>>>
>>>> Regards,
>>>> Hironori
>>>
>

Mime
View raw message