flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Mon, 30 May 2016 09:23:48 GMT
Thanks for the feedback! :-) I already read the comments on the file.

On Mon, 30 May 2016 at 11:10 Gyula Fóra <gyula.fora@gmail.com> wrote:

> Thanks Aljoscha :) I added some comments that might seem relevant from the
> users point of view.
>
> Gyula
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2016. máj. 30.,
> H, 10:33):
>
> > Hi,
> > I created a new doc specifically about the interplay of lateness and
> > window state garbage collection:
> >
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
> >
> > There is still some stuff that needs to be figured out, both in the new
> > doc and the existing doc. For example, we need to decide whether to make
> > accumulating/discarding behavior global for a window operation or
> > controllable by triggers. Initially, I suggested to make
> > accumulating/discarding a global setting for the window operation because
> > we can get away with keeping less state if we know that we always discard
> > when firing. Please take a look at the new doc to see what I'm talking
> > about there.
> >
> > Feedback very welcome!
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >
> >> Hi Max,
> >> thanks for the Feedback and suggestions! I'll try and address each
> >> paragraph separately.
> >>
> >> I'm afraid deciding based on the "StreamTimeCharacteristic is not
> >> possible since a user can use processing-time windows in their job even
> >> though the set the characteristic to event-time. Enabling event time
> does
> >> not disable processing time, it just enables an additional feature.
> (IMHO,
> >> the handling of the StreamTimeCharacteristic is still somewhat
> problematic.)
> >>
> >> Making the decision based purely on the class of the WindowAssigner is
> >> also not possible since we don't know in advance which WindowAssigners
> the
> >> users will write and what time characteristic they will use.
> >>
> >> Regarding the third proposition. Removing 'System.currentTimeMillis()'
> is
> >> very desirable and part of my proposal. However, it is still meant as
> being
> >> separate from "event-time" since a Trigger/WindowAssigner might need
> both.
> >> For example, a Trigger might want to do early triggering a few
> >> (processing-time) seconds after the first elements arrive and proper
> >> triggering once the watermark for the end of the window arrives.
> >>
> >> These are good ideas but I'm afraid we still don't have a good solution.
> >> This whole processing time/event time business is just very tricky.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <mxm@apache.org> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thank you for the detailed design document.
> >>>
> >>> Wouldn't it be ok to allow these new concepts regardless of the time
> >>> semantics? For Event Time and Ingestion Time "Lateness" and
> >>> "Accumulating/Discarding" make sense. If the user chooses Processing
> >>> time then these can be ignored during translation of the StreamGraph
> >>> (possibly with a warning).
> >>>
> >>> Detecting when these concepts make sense should be possible by
> >>> checking the "Stream Charateristics" of the ExecutionEnvironment or
> >>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
> >>> StreamGraph. If the users uses a custom WindowAssigner then the user
> >>> has to take care that it is used correctly. I don't like the
> >>> "isEventTime()" method. Even with the additional method, users could
> >>> return 'true' there although they meant 'false', right? So this does
> >>> not really solve the problem that it is hard to distinguish Event Time
> >>> and Processing Time semantics in Flink.
> >>>
> >>> Another approach that I could think of is getting rid of
> >>> 'System.currentTimeMillis()' and only allow to get time via a special
> >>> interface that WindowAssigners implement. Then we could determine what
> >>> time is assigned and also verify that it is actually used (in contrast
> >>> to the isEventTime() method). Would that be an option or would it
> >>> break the API?
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> >>> wrote:
> >>> > By the way. The way I see to fixing this is extending WindowAssigner
> >>> with
> >>> > an "isEventTime()" method and then allow accumulating/lateness in the
> >>> > WindowOperator only if this is true.
> >>> >
> >>> > But it seems a but hacky because it special cases event-time. But
> then
> >>> > again, maybe we need to special case it ...
> >>> >
> >>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <aljoscha@apache.org>
> >>> wrote:
> >>> >
> >>> >> Hi Folks,
> >>> >> as part of my effort to improve the windowing in Flink [1] I also
> >>> thought
> >>> >> about lateness, accumulating/discarding and window cleanup. I have
> >>> some
> >>> >> ideas on this but I would love to get feedback from the community
> as I
> >>> >> think that these things are important for everyone doing event-time
> >>> >> windowing on Flink.
> >>> >>
> >>> >> The basic problem is this: Some elements can arrive behind the
> >>> watermark
> >>> >> if the watermark is not 100 % correct (which it is not, in most
> >>> cases, I
> >>> >> would assume). We need to provide API that allows to specify what
> >>> happens
> >>> >> when these late elements arrive. There are two main knobs for the
> >>> user here:
> >>> >>
> >>> >> - Allowed Lateness: How late can an element be before it is
> completely
> >>> >> ignored, i.e. simply discarded
> >>> >>
> >>> >> - Accumulating/Discarding Fired Windows: When we fire a window,
do
> we
> >>> >> purge the contents or do we keep it around until the watermark
> passes
> >>> the
> >>> >> end of end window plus the allowed lateness? If we keep the window
a
> >>> late
> >>> >> element will be added to the window and the window will be emitted
> >>> again.
> >>> >> If don't keep the window then the late element will essentially
> >>> trigger
> >>> >> emission of a one-element window.
> >>> >>
> >>> >> This is somewhat straightforward to implement: If accumulating
set a
> >>> timer
> >>> >> for the end of the window plus the allowed lateness. Cleanup the
> >>> window
> >>> >> when that fires (basically). All in event-time with watermarks.
> >>> >>
> >>> >>  My problem is only this: what should happen if the user specifies
> >>> some
> >>> >> allowed lateness and/or accumulating mode but uses processing-time
> >>> >> windowing. For processing-time windows these don't make sense
> because
> >>> >> elements cannot can be late by definition. The problem is, that
we
> >>> cannot
> >>> >> figure out, by looking at a WindowAssigner or the Windows that
it
> >>> assigns
> >>> >> to elements whether these windows are in event-time or
> processing-time
> >>> >> domain. At the API level this is also not easily visible, since
a
> user
> >>> >> might have set the "stream-time-characteristic" to event-time but
> >>> still use
> >>> >> a processing-time window (plus trigger) in the program.
> >>> >>
> >>> >> Any ideas for solving this are extremely welcome. :-)
> >>> >>
> >>> >> Cheers,
> >>> >> Aljoscha
> >>> >>
> >>> >> [1]
> >>> >>
> >>>
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
> >>> >>
> >>>
> >>
>

Mime
View raw message