flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: ProcessFunction example
Date Thu, 09 Mar 2017 09:30:03 GMT
Hi Philippe,

You are right! 
Thanks for reporting it!
We will fix it asap.

Kostas

> On Mar 9, 2017, at 8:38 AM, Philippe Caparroy <philippe.caparroy@orange.fr> wrote:
> 
> I think there is an error in the code snippet describing the ProcessFunction time out
example :  https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
> 
> 
> @Override
>     public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String,
Long>> out)
>             throws Exception {
> 
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
> 
>         // check if this is an outdated timer or the latest timer
>         if (timestamp == result.lastModified) {
>             // emit the state
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> If, as stated in the example, the CountWithTimeoutFunction should emit a key/count if
no further update occurred during the  minute elapsed since last update, the test should be
: 
> 
> if (timestamp == result.lastModified + 60000) { 
> 	// emit the state on timeout 
> 	out.collect(new Tuple2<String, Long>(result.key, result.count)); 
> }
> 
> As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer method
is the timestamp of the firing timer.
> 
> 
> 
> 
> 
> 


Mime
View raw message