flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Álvaro Vilaplana García <alvaro.vilapl...@gmail.com>
Subject Re: Problem with ProcessFunction timeout feature
Date Fri, 23 Jun 2017 08:55:00 GMT
Hi Stefan,

You meant

/**
 * Registers a timer to be fired when processing time passes the given time.
 *
 * <p>Timers can internally be scoped to keys and/or windows. When you
set a timer
 * in a keyed context, such as in an operation on
 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then
that context
 * will also be active when you receive the timer notification.
 */
void registerProcessingTimeTimer(long time);


Am i right?


Cheers


2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <
alvaro.vilaplana@gmail.com>:

> Hi Stefan,
>
> Thank you for your knowledge, very appreciated.
>
> According with the documentation:
>
> void registerEventTimeTimer(long time); -> 'Registers a timer to be fired
> when the event time watermark passes the given time.'
>
> Dont we have the same problem? We would need an event (that event does not
> come soon) to set the watermark and trigger the timer.
>
> Or there is another way of setting the watermark based on the processing
> time instead of the event time?
>
> Cheers
>
> 2017-06-23 9:24 GMT+01:00 Stefan Richter <s.richter@data-artisans.com>:
>
>> Hi,
>>
>> yes, I think you understood the basic concept of watermarks. Events are
>> basically driving „the event time clock“, so it can only advance when you
>> see events. I am not sure if I got the part about partitions correctly, but
>> the watermark event time is a global thing. For example, if you have
>> multiple Kafka partitions that your source reads, each partition can have a
>> different current watermark. However, the source must determine the current
>> event time of the stream, e.g. as the minimum of the watermarks from all
>> the Kafka partition it reads.
>>
>> One thing that might help for your use case is a combination of event
>> time and processing time. In the processing function, after each device
>> event, you could register a timer so far ahead in processing time that it
>> can serve as a signal to check for time outs because you did not receive
>> events in a long time.
>>
>> Best,
>> Stefan
>>
>> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <
>> alvaro.vilaplana@gmail.com>:
>>
>> Hi Stefan,
>>
>> Thank you so much for your answer.
>>
>> Regarding the 'artificial events', our main problem is that we have no
>> control at all in the devices.
>>
>> I have been reading more about event time and watermarks and what I
>> understood is that when we use event times (device times) Flink does not
>> know anything about notion of time and the watermark is a way to help Flink
>> to set the time of the stream (no more events with event time earlier than
>> the watermark). That would explain that we need always an event to set the
>> watermark. Does it make sense?
>>
>>
>> I understood that the watermarks will be per partition (ByKey(deviceId)),
>> is that right?
>>
>>
>> Cheers
>>
>> 2017-06-22 16:26 GMT+01:00 Stefan Richter <s.richter@data-artisans.com>:
>>
>>> Hi,
>>>
>>> if I understand correctly, your problem is that event time does not
>>> progress in case you don’t receive events, so you cannot detect the timeout
>>> of devices. Would it make sense to have you source periodically send
>>> artificial events to advance the watermark in the absence of device events,
>>> with a certain gap for which you can safely assume that you will no longer
>>> receive events with a smaller timestamp from any device in the future?
>>> Because, how else could Flink advance event time without receiving further
>>> events?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <
>>> alvaro.vilaplana@gmail.com>:
>>> >
>>> > Hi,
>>> >
>>> > Please, can you help me with a problem? I summarise in the next
>>> points, I hope is enough clear to approach some help.
>>> >
>>> >
>>> > a) We have devices, each with its own ID, which we don’t have control
>>> of
>>> >
>>> > b) These devices send messages, with an internally generated,
>>> non-synced (amongst other devices) timestamp
>>> >
>>> > c) We want to detect when each devices may stop sending messages
>>> >
>>> > d) For that, we are using a ProcessFunction
>>> >
>>> > e) The devices put the messages in a Kafka topic, partitioned by ID.
>>> >
>>> > f) We are struggling with the ProcessFunction timeout feature:
>>> >
>>> > We cannot rely on real time (processing time), since the messages from
>>> the devices may be delayed (even if their timestamp does not show these
>>> delays) - so we rely on device timestamps instead.
>>> > In our case an event comes in which: "Registers a timer to be fired
>>> when the event time watermark passes the given time". The problem we have
>>> is there are cases where we do not get an additional event after the first
>>> event- which means that the original event timeouts are not triggered.
>>> >
>>> > As a side note we've seen in unit tests that flink seems to set a
>>> watermark after the last event with a Long.MaxValue (9223372036854775807) -
>>> which hides the above problem.
>>> >
>>> > I am using Scala 2.11 /Flink versions 1.2.0
>>> >
>>> > Regards
>>> > --
>>> > ______________________________
>>> >
>>> > Álvaro Vilaplana García
>>>
>>>
>>
>>
>> --
>> ______________________________
>>
>> Álvaro Vilaplana García
>>
>>
>>
>
>
> --
> ______________________________
>
> Álvaro Vilaplana García
>



-- 
______________________________

Álvaro Vilaplana García

Mime
View raw message