flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Mon, 30 May 2016 09:10:25 GMT
Thanks Aljoscha :) I added some comments that might seem relevant from the
users point of view.

Gyula

Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2016. máj. 30.,
H, 10:33):

> Hi,
> I created a new doc specifically about the interplay of lateness and
> window state garbage collection:
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>
> There is still some stuff that needs to be figured out, both in the new
> doc and the existing doc. For example, we need to decide whether to make
> accumulating/discarding behavior global for a window operation or
> controllable by triggers. Initially, I suggested to make
> accumulating/discarding a global setting for the window operation because
> we can get away with keeping less state if we know that we always discard
> when firing. Please take a look at the new doc to see what I'm talking
> about there.
>
> Feedback very welcome!
>
> Cheers,
> Aljoscha
>
> On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <aljoscha@apache.org> wrote:
>
>> Hi Max,
>> thanks for the Feedback and suggestions! I'll try and address each
>> paragraph separately.
>>
>> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> possible since a user can use processing-time windows in their job even
>> though the set the characteristic to event-time. Enabling event time does
>> not disable processing time, it just enables an additional feature. (IMHO,
>> the handling of the StreamTimeCharacteristic is still somewhat problematic.)
>>
>> Making the decision based purely on the class of the WindowAssigner is
>> also not possible since we don't know in advance which WindowAssigners the
>> users will write and what time characteristic they will use.
>>
>> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
>> very desirable and part of my proposal. However, it is still meant as being
>> separate from "event-time" since a Trigger/WindowAssigner might need both.
>> For example, a Trigger might want to do early triggering a few
>> (processing-time) seconds after the first elements arrive and proper
>> triggering once the watermark for the end of the window arrives.
>>
>> These are good ideas but I'm afraid we still don't have a good solution.
>> This whole processing time/event time business is just very tricky.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <mxm@apache.org> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you for the detailed design document.
>>>
>>> Wouldn't it be ok to allow these new concepts regardless of the time
>>> semantics? For Event Time and Ingestion Time "Lateness" and
>>> "Accumulating/Discarding" make sense. If the user chooses Processing
>>> time then these can be ignored during translation of the StreamGraph
>>> (possibly with a warning).
>>>
>>> Detecting when these concepts make sense should be possible by
>>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>>> StreamGraph. If the users uses a custom WindowAssigner then the user
>>> has to take care that it is used correctly. I don't like the
>>> "isEventTime()" method. Even with the additional method, users could
>>> return 'true' there although they meant 'false', right? So this does
>>> not really solve the problem that it is hard to distinguish Event Time
>>> and Processing Time semantics in Flink.
>>>
>>> Another approach that I could think of is getting rid of
>>> 'System.currentTimeMillis()' and only allow to get time via a special
>>> interface that WindowAssigners implement. Then we could determine what
>>> time is assigned and also verify that it is actually used (in contrast
>>> to the isEventTime() method). Would that be an option or would it
>>> break the API?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>> > By the way. The way I see to fixing this is extending WindowAssigner
>>> with
>>> > an "isEventTime()" method and then allow accumulating/lateness in the
>>> > WindowOperator only if this is true.
>>> >
>>> > But it seems a but hacky because it special cases event-time. But then
>>> > again, maybe we need to special case it ...
>>> >
>>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>> >
>>> >> Hi Folks,
>>> >> as part of my effort to improve the windowing in Flink [1] I also
>>> thought
>>> >> about lateness, accumulating/discarding and window cleanup. I have
>>> some
>>> >> ideas on this but I would love to get feedback from the community as
I
>>> >> think that these things are important for everyone doing event-time
>>> >> windowing on Flink.
>>> >>
>>> >> The basic problem is this: Some elements can arrive behind the
>>> watermark
>>> >> if the watermark is not 100 % correct (which it is not, in most
>>> cases, I
>>> >> would assume). We need to provide API that allows to specify what
>>> happens
>>> >> when these late elements arrive. There are two main knobs for the
>>> user here:
>>> >>
>>> >> - Allowed Lateness: How late can an element be before it is completely
>>> >> ignored, i.e. simply discarded
>>> >>
>>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>>> >> purge the contents or do we keep it around until the watermark passes
>>> the
>>> >> end of end window plus the allowed lateness? If we keep the window a
>>> late
>>> >> element will be added to the window and the window will be emitted
>>> again.
>>> >> If don't keep the window then the late element will essentially
>>> trigger
>>> >> emission of a one-element window.
>>> >>
>>> >> This is somewhat straightforward to implement: If accumulating set a
>>> timer
>>> >> for the end of the window plus the allowed lateness. Cleanup the
>>> window
>>> >> when that fires (basically). All in event-time with watermarks.
>>> >>
>>> >>  My problem is only this: what should happen if the user specifies
>>> some
>>> >> allowed lateness and/or accumulating mode but uses processing-time
>>> >> windowing. For processing-time windows these don't make sense because
>>> >> elements cannot can be late by definition. The problem is, that we
>>> cannot
>>> >> figure out, by looking at a WindowAssigner or the Windows that it
>>> assigns
>>> >> to elements whether these windows are in event-time or processing-time
>>> >> domain. At the API level this is also not easily visible, since a user
>>> >> might have set the "stream-time-characteristic" to event-time but
>>> still use
>>> >> a processing-time window (plus trigger) in the program.
>>> >>
>>> >> Any ideas for solving this are extremely welcome. :-)
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> [1]
>>> >>
>>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>> >>
>>>
>>

Mime
View raw message