flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Process Function's timers "postponing"
Date Tue, 25 Jun 2019 17:14:43 GMT
If you are using processing time, one possible way is to track last registered in another ValueState<long>.
And you could call #deleteProcessingTimeTimer(time) when you register new timer and found
previous timer which stored in ValueState has smaller timestamp(T1) than current time (T2).
After delete that processing timer, T1 would not trigger any action. You could refer to [1]
and its usage for similar ideas.


[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java

________________________________
From: Andrea Spina <andrea.spina@radicalbit.io>
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"

Hi Yun, thank you for your answer. I'm not sure I got your point. My question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do
that, I want to remove the previous timer T1 in order to "postpone" the triggering.

In other words, I would like for a single key to be active just one-timer and if a new timer
is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <myasuka@live.com<mailto:myasuka@live.com>>
ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted.
When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at
[1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the
"postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the
'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this
could help you.

Best
Yun Tang

[1] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>

________________________________
From: Andrea Spina <andrea.spina@radicalbit.io<mailto:andrea.spina@radicalbit.io>>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like
to do is to "postpone" eventually registered timers for the given key: I would like to do
it since I might process plenty of events in a row (think about it as a session) so that I
will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer
triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance,
I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does
this imply that a new PT timer will automatically overwrite an eventual previously existing
one?

Thank you for your precious help,

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Mime
View raw message