flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Process Function
Date Tue, 05 Sep 2017 15:17:33 GMT
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an
active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever)
interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

> On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com> wrote:
> 
> Hi,
> 
> You can register a processing time timer inside the onTimer and the open function to
have a timer that run periodically.
> Pseudo-code example:
> 
> ValueState<Long> lastRuntime;
> 
> void open() {
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
> 
> void onTimer() {
>   // Run the periodic task
>   if (lastRuntime.get() + 60000 == timeStamp) {
>     periodicTask();
>   }
>   // Re-register the processing time timer timer
>   lastRuntime.setValue(timeStamp);
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
> 
> void periodicTask()
> 
> For the second question, timer are already scoped by key, so you can keep a lastModified
variable as a ValueState, 
> then compare it to the timestamp provided by the timer to see if the current key should
be evicted. 
> Checkout the example on the ProcessFunction page. 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
> 
> Best regards,
> Kien
> 
> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>> Hi All,
>> 
>> I have a streaming pipeline which is keyed by userid and then to a flatmap function.
I need to clear the state after sometime and I was looking at process function for it.
>> 
>> Inside the process element function if I register a timer wouldn't it create a timer
for each incoming message?
>> // schedule the next timer 60 seconds from the current event time
>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>> How can I get something like a clean up task that runs every 2 mins and evicts all
stale data? Also is there a way to get the key inside onTimer function so that I know which
key has to be evicted?
>> 
>> Thanks,
>> Navneeth


Mime
View raw message