flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Process Function
Date Wed, 06 Sep 2017 08:50:27 GMT
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo
(cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

> On 5. Sep 2017, at 21:14, Johannes Schulte <johannes.schulte@gmail.com> wrote:
> 
> Hi,
> 
> one short question I had that fits here. When using higher level streaming we can set
min and max retention time [1] which is probably used to reduce the number of timers registered
under the hood. How is this implemented, by registering a "clamped" timer?
> 
> Thanks,
> 
> Johannes
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time>
> 
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi,
> 
> This is mostly correct, but you cannot register a timer in open() because we don't have
an active key there. Only in process() and onTimer() can you register a timer.
> 
> In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute
(or whatever) interval or to keep an extra ValueState that tells you whether you already registered
a timer.
> 
> Best,
> Aljoscha
> 
>> On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com <mailto:duckientruong@gmail.com>>
wrote:
>> 
>> Hi,
>> 
>> You can register a processing time timer inside the onTimer and the open function
to have a timer that run periodically.
>> Pseudo-code example:
>> 
>> ValueState<Long> lastRuntime;
>> 
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>> 
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 60000 == timeStamp) {
>>     periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>> 
>> void periodicTask()
>> 
>> For the second question, timer are already scoped by key, so you can keep a lastModified
variable as a ValueState, 
>> then compare it to the timestamp provided by the timer to see if the current key
should be evicted. 
>> Checkout the example on the ProcessFunction page. 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
>> 
>> Best regards,
>> Kien
>> 
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>> Hi All,
>>> 
>>> I have a streaming pipeline which is keyed by userid and then to a flatmap function.
I need to clear the state after sometime and I was looking at process function for it.
>>> 
>>> Inside the process element function if I register a timer wouldn't it create
a timer for each incoming message?
>>> // schedule the next timer 60 seconds from the current event time
>>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>>> How can I get something like a clean up task that runs every 2 mins and evicts
all stale data? Also is there a way to get the key inside onTimer function so that I know
which key has to be evicted?
>>> 
>>> Thanks,
>>> Navneeth
> 
> 


Mime
View raw message