flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness
Date Mon, 17 Oct 2016 14:24:04 GMT
Hi Yassine,

the difference is the following:

1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
extractor and watermark assigner.
A timestamp extractor tells Flink when an event happened, i.e., it extracts
a timestamp from the event. A watermark assigner tells Flink what the
current logical time is.
The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
asks what the current time is, it returns the latest observed timestamp
minus the a configurable bound. This is the safety margin for late data.
 A record whose timestamp is lower than the last watermark is considered to
be late.

2) The allowedLateness parameter of time windows tells Flink how long to
keep state around after the window was evaluated.
If data arrives after the evaluation and before the allowedLateness has
passed, the window function is applied again and an update is sent out.

Let's look at an example.
Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
window that starts at 12:00 and ends at 12:10:

If you have the following data:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

== no allowed lateness
The window operator forwards the logical time to 12:12 when it receives
<WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
time and finally purges its state. <12:09, G> is later ignored.

== allowed lateness of 3 minutes
The window operator evaluates the window when <WM, 12:12> is received, but
its state is not purged yet. The state is purged when <WM, 12:14> is
received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
again ignored.

== allowed lateness of 5 minutes
The window operator evaluates the window when <WM, 12:12> is received, but
its state is not purged yet. When <12:09, G> is received, the window is
again evaluated but this time with [A, B, C, D, G] and an update is sent
out. The state is purged when a watermark of >=12:15 is received.

So, watermarks tell the Flink what time it is and allowed lateness tells
the system when state should be discarded and all later arriving data be
These issue are related but not exactly the same thing. For instance you
can counter late data by increasing the bound or the lateness parameter.
Increasing the watermark bound will yield higher latencies as windows are
evaluated later.
Configuring allowedLateness will allow for earlier results, but you have to
cope with the updates downstream.

Please let me know, if you have questions.

Best, Fabian

2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:

> Hi,
> I'm a bit confused about how Flink deals with late elements after the
> introduction of allowedlateness to windows. What is the difference between
> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
> allowedlateness(Time.seconds(X))? What if one is used and the other is
> not? and what if a different lateness is used in each one? Could you please
> clarify it on basis of a simple example? Thank you.
> Best,
> Yassine

View raw message