flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Johannes Schulte <johannes.schu...@gmail.com>
Subject Re: Process Function
Date Wed, 06 Sep 2017 19:26:36 GMT
Thanks, that helped to see how we could implement this!

On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther <twalthr@apache.org> wrote:

> Hi Johannes,
>
> you can find the implementation for the state clean up here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
>
> and a example usage here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcTimeUnboundedOver.scala
>
> Regards,
> Timo
>
>
> Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
>
> Hi,
>
> I'm actually not very familiar with the current Table API implementations
> but Fabian or Timo (cc'ed) should know more. I suspect very much that this
> is implemented like this, yes.
>
> Best,
> Aljoscha
>
> On 5. Sep 2017, at 21:14, Johannes Schulte <johannes.schulte@gmail.com>
> wrote:
>
> Hi,
>
> one short question I had that fits here. When using higher level streaming
> we can set min and max retention time [1] which is probably used to reduce
> the number of timers registered under the hood. How is this implemented, by
> registering a "clamped" timer?
>
> Thanks,
>
> Johannes
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/
> streaming.html#idle-state-retention-time
>
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>>
>> This is mostly correct, but you cannot register a timer in open() because
>> we don't have an active key there. Only in process() and onTimer() can you
>> register a timer.
>>
>> In your case, I would suggest to somehow clamp the timestamp to the
>> nearest 2 minute (or whatever) interval or to keep an extra ValueState that
>> tells you whether you already registered a timer.
>>
>> Best,
>> Aljoscha
>>
>> On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com> wrote:
>>
>> Hi,
>>
>> You can register a processing time timer inside the onTimer and the open
>> function to have a timer that run periodically.
>>
>> Pseudo-code example:
>>
>> ValueState<Long> lastRuntime;
>>
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>>
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 60000 == timeStamp) {
>>     periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);  ctx.timerService().registerProcessingTimeTimer(current.timestamp
+ 60000);
>> }
>>
>> void periodicTask()
>>
>>
>> For the second question, timer are already scoped by key, so you can keep
>> a lastModified variable as a ValueState,
>> then compare it to the timestamp provided by the timer to see if the
>> current key should be evicted.
>> Checkout the example on the ProcessFunction page.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/process_function.html
>>
>> Best regards,
>> Kien
>>
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>
>> Hi All,
>>
>> I have a streaming pipeline which is keyed by userid and then to a
>> flatmap function. I need to clear the state after sometime and I was
>> looking at process function for it.
>>
>> Inside the process element function if I register a timer wouldn't it
>> create a timer for each incoming message?
>>
>> // schedule the next timer 60 seconds from the current event time
>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>>
>> How can I get something like a clean up task that runs every 2 mins and
>> evicts all stale data? Also is there a way to get the key inside onTimer
>> function so that I know which key has to be evicted?
>>
>> Thanks,
>> Navneeth
>>
>>
>>
>
>
>

Mime
View raw message