flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kien Truong <duckientru...@gmail.com>
Subject Re: Process Function
Date Tue, 05 Sep 2017 14:55:31 GMT

You can register a processing time timer inside the onTimer and the open 
function to have a timer that run periodically.

Pseudo-code example:

|ValueState<Long> lastRuntime; void open() { 
ctx.timerService().registerProcessingTimeTimer(current.timestamp + 
60000); } void onTimer() { // Run the periodic task if 
(lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } // 
Re-register the processing time timer timer 
lastRuntime.setValue(timeStamp); |   ||ctx.timerService().registerProcessingTimeTimer(current.timestamp
60000);| } void periodicTask() |

For the second question, timer are already scoped by key, so you can 
keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the 
current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
> Hi All,
> I have a streaming pipeline which is keyed by userid and then to a 
> flatmap function. I need to clear the state after sometime and I was 
> looking at process function for it.
> Inside the process element function if I register a timer wouldn't it 
> create a timer for each incoming message?
> |// schedule the next timer 60 seconds from the current event time 
> ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);|
> How can I get something like a clean up task that runs every 2 mins 
> and evicts all stale data? Also is there a way to get the key inside 
> onTimer function so that I know which key has to be evicted?
> Thanks,
> Navneeth

View raw message