flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Windows and Watermarks Clarification
Date Fri, 02 Sep 2016 05:54:25 GMT
Just one clarification: even with a specified allowed lateness the window
will still be evaluated once the watermark passes the end of the window.
It's just that with allowed lateness the window contents and state will be
kept around a bit longer to allow eventual late elements to update the
results. What happens when late elements arrive depends on the trigger.
With the default EventTimeTrigger you will get a new firing that processes
the previously available elements along with the new (late-arriving)
element.

Cheers,
Aljoscha

On Thu, 1 Sep 2016 at 21:15 Fabian Hueske <fhueske@gmail.com> wrote:

> A 10 minute tumbling window that starts at 12:00 is evaluated after a
> watermark is observed that is > 12:10.
> If the same tumbling window has an allowed lateness of 5 minuted, it is
> evaluated once a watermark > 12:15 is observed. However, only elements with
> timestamps 12:00 <= x < 12:10 are in the window.
> Elements that arrive even after the allowed lateness period are simply
> dropped.
>
> Best, Fabian
>
> 2016-09-01 20:42 GMT+02:00 Paul Joireman <paul.joireman@physiq.com>:
>
>> Thanks Fabian,
>>
>>
>> This is making more sense.  Is allowedLateness(Time.seconds(x)) then
>> evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime
>> - lastWaterMarkTime) > x * 1000 then the window is evaluated?
>>
>>
>> Paul
>> ------------------------------
>> *From:* Fabian Hueske <fhueske@gmail.com>
>> *Sent:* Thursday, September 1, 2016 1:25:55 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Windows and Watermarks Clarification
>>
>> Hi Paul,
>>
>> BoundedOutOfOrdernessTimestampExtractor implements the
>> AssignerWithPeriodicWatermarks interface.
>> This means, Flink will ask the assigner in regular intervals
>> (configurable via
>> StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the
>> current watermark.
>> The watermark will be 10secs earlier than the highest observed timestamp
>> so far.
>>
>> An event-time window is evaluated when the current watermark is higher /
>> later than the window's end time. With allowedLateness() the window
>> evaluation can be deferred to allow late elements (elements whose timestamp
>> is before the current watermark) to join the window before it is evaluated.
>>
>> Let me know if you have further questions,
>> Fabian
>>
>>
>> 2016-09-01 20:16 GMT+02:00 Paul Joireman <paul.joireman@physiq.com>:
>>
>>> Hi all,
>>>
>>>
>>> Just a point of clarification on how watermarks are generated.   I'd
>>> like to use a SlidingEventTime window of say 5 minutes with a 30 second
>>> slide.  The incoming data stream has elements from which I can extract the
>>> timestamp but they may come out of order so I chose to implement the
>>> following timestamp assigner.
>>>
>>>
>>>      my_stream.assignTimestampsAndWatermarks(
>>>       new
>>> BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
>>>           @Override
>>>           public long extractTimestamp(final MyElement element) {
>>>               return element.getTimestamp();
>>>           }
>>>   });
>>>
>>> With this definition and the code for
>>> BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each
>>> incoming element a watermark will be generated that is 10 seconds behind
>>> the current timestamp.    If any the end time of any of the sliding windows
>>> is earlier that an emitted watermark that (or those) windows will fire
>>> initiating a processing on the window(s).   Is this correct?
>>>
>>> Paul
>>>
>>>
>>
>

Mime
View raw message