flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Mon, 30 May 2016 08:33:44 GMT
I created a new doc specifically about the interplay of lateness and window
state garbage collection:

There is still some stuff that needs to be figured out, both in the new doc
and the existing doc. For example, we need to decide whether to make
accumulating/discarding behavior global for a window operation or
controllable by triggers. Initially, I suggested to make
accumulating/discarding a global setting for the window operation because
we can get away with keeping less state if we know that we always discard
when firing. Please take a look at the new doc to see what I'm talking
about there.

Feedback very welcome!


On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <aljoscha@apache.org> wrote:

> Hi Max,
> thanks for the Feedback and suggestions! I'll try and address each
> paragraph separately.
> I'm afraid deciding based on the "StreamTimeCharacteristic is not possible
> since a user can use processing-time windows in their job even though the
> set the characteristic to event-time. Enabling event time does not disable
> processing time, it just enables an additional feature. (IMHO, the handling
> of the StreamTimeCharacteristic is still somewhat problematic.)
> Making the decision based purely on the class of the WindowAssigner is
> also not possible since we don't know in advance which WindowAssigners the
> users will write and what time characteristic they will use.
> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
> very desirable and part of my proposal. However, it is still meant as being
> separate from "event-time" since a Trigger/WindowAssigner might need both.
> For example, a Trigger might want to do early triggering a few
> (processing-time) seconds after the first elements arrive and proper
> triggering once the watermark for the end of the window arrives.
> These are good ideas but I'm afraid we still don't have a good solution.
> This whole processing time/event time business is just very tricky.
> Cheers,
> Aljoscha
> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <mxm@apache.org> wrote:
>> Hi Aljoscha,
>> Thank you for the detailed design document.
>> Wouldn't it be ok to allow these new concepts regardless of the time
>> semantics? For Event Time and Ingestion Time "Lateness" and
>> "Accumulating/Discarding" make sense. If the user chooses Processing
>> time then these can be ignored during translation of the StreamGraph
>> (possibly with a warning).
>> Detecting when these concepts make sense should be possible by
>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>> StreamGraph. If the users uses a custom WindowAssigner then the user
>> has to take care that it is used correctly. I don't like the
>> "isEventTime()" method. Even with the additional method, users could
>> return 'true' there although they meant 'false', right? So this does
>> not really solve the problem that it is hard to distinguish Event Time
>> and Processing Time semantics in Flink.
>> Another approach that I could think of is getting rid of
>> 'System.currentTimeMillis()' and only allow to get time via a special
>> interface that WindowAssigners implement. Then we could determine what
>> time is assigned and also verify that it is actually used (in contrast
>> to the isEventTime() method). Would that be an option or would it
>> break the API?
>> Cheers,
>> Max
>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> > By the way. The way I see to fixing this is extending WindowAssigner
>> with
>> > an "isEventTime()" method and then allow accumulating/lateness in the
>> > WindowOperator only if this is true.
>> >
>> > But it seems a but hacky because it special cases event-time. But then
>> > again, maybe we need to special case it ...
>> >
>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> >
>> >> Hi Folks,
>> >> as part of my effort to improve the windowing in Flink [1] I also
>> thought
>> >> about lateness, accumulating/discarding and window cleanup. I have some
>> >> ideas on this but I would love to get feedback from the community as I
>> >> think that these things are important for everyone doing event-time
>> >> windowing on Flink.
>> >>
>> >> The basic problem is this: Some elements can arrive behind the
>> watermark
>> >> if the watermark is not 100 % correct (which it is not, in most cases,
>> I
>> >> would assume). We need to provide API that allows to specify what
>> happens
>> >> when these late elements arrive. There are two main knobs for the user
>> here:
>> >>
>> >> - Allowed Lateness: How late can an element be before it is completely
>> >> ignored, i.e. simply discarded
>> >>
>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> >> purge the contents or do we keep it around until the watermark passes
>> the
>> >> end of end window plus the allowed lateness? If we keep the window a
>> late
>> >> element will be added to the window and the window will be emitted
>> again.
>> >> If don't keep the window then the late element will essentially trigger
>> >> emission of a one-element window.
>> >>
>> >> This is somewhat straightforward to implement: If accumulating set a
>> timer
>> >> for the end of the window plus the allowed lateness. Cleanup the window
>> >> when that fires (basically). All in event-time with watermarks.
>> >>
>> >>  My problem is only this: what should happen if the user specifies some
>> >> allowed lateness and/or accumulating mode but uses processing-time
>> >> windowing. For processing-time windows these don't make sense because
>> >> elements cannot can be late by definition. The problem is, that we
>> cannot
>> >> figure out, by looking at a WindowAssigner or the Windows that it
>> assigns
>> >> to elements whether these windows are in event-time or processing-time
>> >> domain. At the API level this is also not easily visible, since a user
>> >> might have set the "stream-time-characteristic" to event-time but
>> still use
>> >> a processing-time window (plus trigger) in the program.
>> >>
>> >> Any ideas for solving this are extremely welcome. :-)
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> [1]
>> >>
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>> >>

View raw message