flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Process Function
Date Wed, 06 Sep 2017 10:01:14 GMT
Hi Johannes,

you can find the implementation for the state clean up here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala

and a example usage here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala

Regards,
Timo


Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
> Hi,
>
> I'm actually not very familiar with the current Table API 
> implementations but Fabian or Timo (cc'ed) should know more. I suspect 
> very much that this is implemented like this, yes.
>
> Best,
> Aljoscha
>
>> On 5. Sep 2017, at 21:14, Johannes Schulte 
>> <johannes.schulte@gmail.com <mailto:johannes.schulte@gmail.com>> wrote:
>>
>> Hi,
>>
>> one short question I had that fits here. When using higher level 
>> streaming we can set min and max retention time [1] which is probably 
>> used to reduce the number of timers registered under the hood. How is 
>> this implemented, by registering a "clamped" timer?
>>
>> Thanks,
>>
>> Johannes
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time
>>
>> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljoscha@apache.org 
>> <mailto:aljoscha@apache.org>> wrote:
>>
>>     Hi,
>>
>>     This is mostly correct, but you cannot register a timer in open()
>>     because we don't have an active key there. Only in process() and
>>     onTimer() can you register a timer.
>>
>>     In your case, I would suggest to somehow clamp the timestamp to
>>     the nearest 2 minute (or whatever) interval or to keep an extra
>>     ValueState that tells you whether you already registered a timer.
>>
>>     Best,
>>     Aljoscha
>>
>>>     On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com
>>>     <mailto:duckientruong@gmail.com>> wrote:
>>>
>>>     Hi,
>>>
>>>     You can register a processing time timer inside the onTimer and
>>>     the open function to have a timer that run periodically.
>>>
>>>     Pseudo-code example:
>>>
>>>     |ValueState<Long> lastRuntime; void open() {
>>>     ctx.timerService().registerProcessingTimeTimer(current.timestamp
>>>     + 60000); } void onTimer() { // Run the periodic task if
>>>     (lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } //
>>>     Re-register the processing time timer timer
>>>     lastRuntime.setValue(timeStamp); |   ||ctx.timerService().registerProcessingTimeTimer(current.timestamp
>>>     + 60000);| } void periodicTask() |
>>>
>>>     For the second question, timer are already scoped by key, so you
>>>     can keep a lastModified variable as a ValueState,
>>>     then compare it to the timestamp provided by the timer to see if
>>>     the current key should be evicted.
>>>     Checkout the example on the ProcessFunction page.
>>>
>>>     https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>>>     <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
>>>
>>>     Best regards,
>>>     Kien
>>>
>>>     On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>>>     Hi All,
>>>>
>>>>     I have a streaming pipeline which is keyed by userid and then
>>>>     to a flatmap function. I need to clear the state after sometime
>>>>     and I was looking at process function for it.
>>>>
>>>>     Inside the process element function if I register a timer
>>>>     wouldn't it create a timer for each incoming message?
>>>>     |// schedule the next timer 60 seconds from the current event
>>>>     time
>>>>     ctx.timerService().registerEventTimeTimer(current.timestamp +
>>>>     60000);|
>>>>     How can I get something like a clean up task that runs every 2
>>>>     mins and evicts all stale data? Also is there a way to get the
>>>>     key inside onTimer function so that I know which key has to be
>>>>     evicted?
>>>>
>>>>     Thanks,
>>>>     Navneeth
>>
>>
>


Mime
View raw message