flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness
Date Mon, 17 Oct 2016 14:59:22 GMT
Hi Fabian,

Thank you very much for the great answer and example, I appreciate it!
It is all clear now.

Best,
Yassine

2016-10-17 16:29 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> I have to extend my answer:
>
> The behavior allowedLateness that I described applies only if the window
> trigger calls FIRE when the window is evaluated (this is the default
> behavior of most triggers).
>
> In case the trigger calls FIRE_AND_PURGE, the state of the window is
> purged when the function is evaluated and late events are processed alone,
> i.e., in my example <12:09, G> would be processed without [A, B, C, D].
> When the allowed lateness is passed, all window state is purged regardless
> of the trigger.
>
> Best, Fabian
>
> 2016-10-17 16:24 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Yassine,
>>
>> the difference is the following:
>>
>> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
>> extractor and watermark assigner.
>> A timestamp extractor tells Flink when an event happened, i.e., it
>> extracts a timestamp from the event. A watermark assigner tells Flink what
>> the current logical time is.
>> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
>> asks what the current time is, it returns the latest observed timestamp
>> minus the a configurable bound. This is the safety margin for late data.
>>  A record whose timestamp is lower than the last watermark is considered
>> to be late.
>>
>> 2) The allowedLateness parameter of time windows tells Flink how long to
>> keep state around after the window was evaluated.
>> If data arrives after the evaluation and before the allowedLateness has
>> passed, the window function is applied again and an update is sent out.
>>
>> Let's look at an example.
>> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
>> window that starts at 12:00 and ends at 12:10:
>>
>> If you have the following data:
>>
>> 12:01, A
>> 12:04, B
>> WM, 12:02 // 12:04 - 2 minutes
>> 12:02, C
>> 12:08, D
>> 12:14, E
>> WM, 12:12
>> 12:16, F
>> WM, 12:14 // 12:16 - 2 minutes
>> 12:09, G
>>
>> == no allowed lateness
>> The window operator forwards the logical time to 12:12 when it receives
>> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
>> time and finally purges its state. <12:09, G> is later ignored.
>>
>> == allowed lateness of 3 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. The state is purged when <WM, 12:14> is
>> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
>> again ignored.
>>
>> == allowed lateness of 5 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. When <12:09, G> is received, the window is
>> again evaluated but this time with [A, B, C, D, G] and an update is sent
>> out. The state is purged when a watermark of >=12:15 is received.
>>
>> So, watermarks tell the Flink what time it is and allowed lateness tells
>> the system when state should be discarded and all later arriving data be
>> ignored.
>> These issue are related but not exactly the same thing. For instance you
>> can counter late data by increasing the bound or the lateness parameter.
>> Increasing the watermark bound will yield higher latencies as windows are
>> evaluated later.
>> Configuring allowedLateness will allow for earlier results, but you have
>> to cope with the updates downstream.
>>
>> Please let me know, if you have questions.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>>
>>
>>
>> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>>
>>> Hi,
>>>
>>> I'm a bit confused about how Flink deals with late elements after the
>>> introduction of allowedlateness to windows. What is the difference between
>>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
>>> allowedlateness(Time.seconds(X))? What if one is used and the other is
>>> not? and what if a different lateness is used in each one? Could you please
>>> clarify it on basis of a simple example? Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>

Mime
View raw message