flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger
Date Mon, 21 Mar 2016 09:56:15 GMT
Hi,
I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not
correctly set. You can try it with this fixed version, that I will also update in the Flink
code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930

One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default
EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger
will only ever fire for a window. This means that you will have a lot of windows hanging around
in your state at some points and they will never be cleaned up. For now, if you require the
behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger
based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time
and also has the continuous triggering at earlier times.

Let us know if you need more information about this. Kostas Kloudas also recently looked into
writing custom Triggers, so maybe he has some material he could give to you.

Cheers,
Aljoscha
> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:
> 
> Hello,
> 
> I have a question about TumblingProcessingTimeWindow and
> ContinuousProcessingTimeTrigger.
> 
> The code I tried is below. Output the distinct count of the words,
> counts are printed every 5 seconds and window is reset every 1 minute.
> 
> ---
>    val input =
> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>      .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      .fold(Set[String]()){(r,i) => { r + i}}
>      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
> 
>    input print
> ---
> 
> I wrote data to the input file with some interval.
> 
> ---
> echo "aaa" >> input.txt
> echo "aaa" >> input.txt
> sleep 10
> echo "bbb" >> input.txt
> sleep 60
> echo "ccc" >> input.txt
> ---
> 
> The result I got was just 1 record. The expected output was 1 -> (10+
> sec later) 2 -> (60+ sec later) 1 .
> ---
> (2016-03-18 13:08:59.288,2)
> ---
> 
> Even after several minutes, I never got additional record. In my
> understanding, with
> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
> operator (fold, map) in the code above will be evaluated every 5
> seconds.
> Am I mis-understand something?
> 
> Regards,
> Hironori


Mime
View raw message