flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Mon, 25 Jul 2016 12:22:49 GMT
These are some very interesting thoughts! I have some more, based on these:

What happens if you have for example this Trigger:
All(Watermark.pastEndOfWindow(), Count.atLeast(10))

When would this even fire, i.e. what are the steps that lead to this
combined trigger firing with the Trigger system that we currently have in
place?

I have some thoughts but they are not compatible with the way we currently
handle triggers. I have to think some more, but please shoot if you have
any ideas.

Cheers,
Aljoscha

On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.kloudas@data-artisans.com>
wrote:

> Forgot to say that the signature for the onFire() that I think fits should
> be:
>
> void onFire(Window window, TriggerContext ctx) throws Exception;
>
> > On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
> >
> > Hi,
> >
> > I started working on the new triggers proposed here and so far I can see
> > two shortcomings in the current state of the triggers that do not play
> well
> > with the new proposals, and more specifically the composite triggers All
> > and Any.
> >
> > So here it goes:
> >
> > 1) In the document posted above, there are some new useful trigger
> combinations (like Any and All) which allow
> > for combining primitive triggers. This decouples the TriggerResult of
> each individual trigger from the action that
> > is actually going to be executed. For example, an All trigger may have
> one proposing FIRE while the other
> > CONTINUE and the final result will be CONTINUE.
> >
> > In this case, any action that  should be taken by each individual
> trigger upon firing, e.g. cleaning its state as in the
> > case of CountTrigger, should be postponed until the parent trigger (All)
> decides to fire.
> >
> > For this, there should be a onFire() method in each trigger that does
> exactly that. This method should be called in the
> > fireOrCleanup() of the windowOperator, when the firing is successful.
> >
> > 2) In the current implementation, when stateful triggers, like the
> CountTrigger, are combined in a composite Trigger
> > (with Any or All) their state is shared because the stateHandle is the
> same for both. To solve this, the handle should
> > become unique, BUT consistent for the same Trigger. The latter implies
> that the handle for the same trigger after
> > a node failure, should be the same as that of its predecessor (before
> the failure).
> >
> > Let me know your thoughts on these.
> >
> > Kostas
> >
> >
> >> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >>
> >> I'm proposing to get this small change into 1.1:
> >> https://issues.apache.org/jira/browse/FLINK-4239 This will make our
> lives
> >> easier with the future proposed changes.
> >>
> >> What do you think?
> >> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >>
> >>> Hi,
> >>> these new features should make it into the 1.2 release. We are already
> >>> working on releasing 1.1 so it won't make it for that one.
> unfortunately.
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnchen@gmail.com> wrote:
> >>>
> >>>> BTW, do you have rough timeline in term of roll out it to production?
> >>>>
> >>>> Thanks,
> >>>> Chen
> >>>>
> >>>>
> >>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>> Chen commented this on the doc (I'm mirroring here so everyone can
> >>>> follow):
> >>>>> "It would be cool to be able to access last snapshot of window states
> >>>>> before it get purged. Pipeline author might consider put it to
> external
> >>>>> storage and deal with late arriving events by restore corresponding
> >>>>> window."
> >>>>>
> >>>>> My answer:
> >>>>> This is partially covered by the section called "What Happens at
> >>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to
> >>>> introduce
> >>>>> is that the window can have one final emission if there is new data
> in
> >>>> the
> >>>>> buffers at cleanup time.
> >>>>>
> >>>>> The work on this will also depend on this proposal:
> >>>>>
> >>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> >>>>> With
> >>>>> this, the WindowFunction can get meta data about the window firing
> so it
> >>>>> could be informed that this is the last firing before a cleanup
and
> that
> >>>>> there already was an earlier, on-time firing.
> >>>>>
> >>>>> Does this cover your concerns, Chen?
> >>>>>
> >>>>> Cheers,
> >>>>> Aljoscha
> >>>>>
> >>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnchen@gmail.com>
wrote:
> >>>>>
> >>>>>> Sure. Currently, it looks like any element assigned to a too
late
> >>>> window
> >>>>>> will be dropped silently😓 ?
> >>>>>>
> >>>>>> Having a late window stream imply somehow Flink needs to add
one
> more
> >>>>> state
> >>>>>> to window and split window state cleanup from window retirement.
> >>>>>> I would suggest as simple as adding a function in trigger called
> >>>>>> OnLateElement and always fire_purge it would enable developer
aware
> >>>> and
> >>>>>> handle this case.
> >>>>>>
> >>>>>> Chen
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <
> aljoscha@apache.org
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> @Chen I added a section at the end of the document regarding
access
> >>>> to
> >>>>>> the
> >>>>>>> elements that are dropped as late. Right now, the section
just
> >>>> mentions
> >>>>>>> that we have to do this but there is no real proposal yet
for how
> >>>> to do
> >>>>>> it.
> >>>>>>> Only a rough sketch so that we don't forget about it.
> >>>>>>>
> >>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnchen@gmail.com>
wrote:
> >>>>>>>
> >>>>>>>> +1 for allowedLateness scenario.
> >>>>>>>>
> >>>>>>>> The rationale behind is there are backfills or data
issues hold
> >>>>>> in-window
> >>>>>>>> data till watermark pass end time. It cause sink write
partial
> >>>>> output.
> >>>>>>>>
> >>>>>>>> Allow high allowedLateness threshold makes life easier
to merge
> >>>> those
> >>>>>>>> results and overwrite partial output with correct output
at sink.
> >>>> But
> >>>>>>> yeah,
> >>>>>>>> pipeline author is at risk of blow up statebackend with
huge
> >>>> states.
> >>>>>>>>
> >>>>>>>> Alternatively, In some case, if sink allows read-check-merge
> >>>>> operation,
> >>>>>>>> window can explicit call out events ingested after
> >>>> allowedLateness.
> >>>>> It
> >>>>>>> asks
> >>>>>>>> pipeline author mitigated these events in a way outside
of flink
> >>>>>>> ecosystem.
> >>>>>>>> The catch is that since everywhere in a flink job can
replay and
> >>>>>>>> checkpoint, notification should somehow includes these
info as
> >>>> well.
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> Chen
> >>>>>>>>
> >>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> >>>>>>>> k.kloudas@data-artisans.com
> >>>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> In the effort to move the discussion to the mailing
list, rather
> >>>>> than
> >>>>>>> the
> >>>>>>>>> doc,
> >>>>>>>>> there was a comment in the doc:
> >>>>>>>>>
> >>>>>>>>> “It seems this proposal marries the allowed lateness
of events
> >>>> and
> >>>>>> the
> >>>>>>>>> discarding of window state. In most use cases this
should be
> >>>>>>> sufficient,
> >>>>>>>>> but there are instances where having independent
control of
> >>>> these
> >>>>> may
> >>>>>>> be
> >>>>>>>>> useful.
> >>>>>>>>>
> >>>>>>>>> For instance, you may have a job that computes some
aggregate,
> >>>>> like a
> >>>>>>>> sum.
> >>>>>>>>> You may want to keep the window state around for
a while, but
> >>>> not
> >>>>> too
> >>>>>>>> long.
> >>>>>>>>> Yet you may want to continue processing late events
after you
> >>>>>> discarded
> >>>>>>>> the
> >>>>>>>>> window state. It is possible that your stream sinks
can make
> >>>> use of
> >>>>>>> this
> >>>>>>>>> data. For instance, they may be writing to a data
store that
> >>>>> returns
> >>>>>> an
> >>>>>>>>> error if a row already exists, which allow the sink
to read the
> >>>>>>> existing
> >>>>>>>>> row and update it with the new data."
> >>>>>>>>>
> >>>>>>>>> To which I would like to reply:
> >>>>>>>>>
> >>>>>>>>> If I understand your use-case correctly, I believe
that the
> >>>>> proposed
> >>>>>>>>> binding of the allowed lateness to the state purging
does not
> >>>>> impose
> >>>>>>> any
> >>>>>>>>> problem. The lateness specifies the upper time bound,
after
> >>>> which
> >>>>> the
> >>>>>>>> state
> >>>>>>>>> will be discarded. Between the start of a window
and its (end +
> >>>>>>>>> allowedLateness) you can write custom triggers that
fire, purge
> >>>> the
> >>>>>>>> state,
> >>>>>>>>> or do nothing. Given this, I suppose that, at the
most extreme
> >>>>> case,
> >>>>>>> you
> >>>>>>>>> can specify an allowed lateness of Long.MaxValue
and do the
> >>>> purging
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>> state "manually". By doing this, you remove the
safeguard of
> >>>>> letting
> >>>>>>> the
> >>>>>>>>> system purge the state at some point in time, and
you can do
> >>>> your
> >>>>> own
> >>>>>>>>> custom state management that fits your needs.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Kostas
> >>>>>>>>>
> >>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek
<
> >>>>> aljoscha@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> @Vishnu Funny you should ask that because I
have a design doc
> >>>>> lying
> >>>>>>>>> around.
> >>>>>>>>>> I'll open a new mail thread to not hijack this
one.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath
<
> >>>>>>>>> vishnu.viswanath25@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I was going through the suggested improvements
in window,
> >>>> and I
> >>>>>> have
> >>>>>>>>>>> few questions/suggestion on improvement
regarding the
> >>>> Evictor.
> >>>>>>>>>>>
> >>>>>>>>>>> 1) I am having a use case where I have to
create a custom
> >>>>> Evictor
> >>>>>>> that
> >>>>>>>>> will
> >>>>>>>>>>> evict elements from the window based on
the value (e.g., if I
> >>>>> have
> >>>>>>>>> elements
> >>>>>>>>>>> are of case class Item(id: Int, type:String)
then evict
> >>>> elements
> >>>>>>> that
> >>>>>>>>> has
> >>>>>>>>>>> type="a"). I believe this is not currently
possible.
> >>>>>>>>>>> 2) this is somewhat related to 1) where
there should be an
> >>>>> option
> >>>>>> to
> >>>>>>>>> evict
> >>>>>>>>>>> elements from anywhere in the window. not
only from the
> >>>>> beginning
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>> window. (e.g., apply the delta function
to all elements and
> >>>>> remove
> >>>>>>> all
> >>>>>>>>>>> those don't pass. I checked the code and
evict method just
> >>>>> returns
> >>>>>>> the
> >>>>>>>>>>> number of elements to be removed and processTriggerResult
> >>>> just
> >>>>>> skips
> >>>>>>>>> those
> >>>>>>>>>>> many elements from the beginning.
> >>>>>>>>>>> 3) Add an option to enables the user to
decide if the
> >>>> eviction
> >>>>>>> should
> >>>>>>>>>>> happen before the apply function or after
the apply function.
> >>>>>>>> Currently
> >>>>>>>>> it
> >>>>>>>>>>> is before the apply function, but I have
a use case where I
> >>>> need
> >>>>>> to
> >>>>>>>>> first
> >>>>>>>>>>> apply the function and evict afterward.
> >>>>>>>>>>>
> >>>>>>>>>>> I am doing these for a POC so I think I
can modify the flink
> >>>>> code
> >>>>>>> base
> >>>>>>>>> to
> >>>>>>>>>>> make these changes and build, but I would
appreciate any
> >>>>>> suggestion
> >>>>>>> on
> >>>>>>>>>>> whether these are viable changes or will
there any
> >>>> performance
> >>>>>> issue
> >>>>>>>> if
> >>>>>>>>>>> these are done. Also any pointer on where
to start(e.g, do I
> >>>>>> create
> >>>>>>> a
> >>>>>>>>> new
> >>>>>>>>>>> class similar to EvictingWindowOperator
that extends
> >>>>>>> WindowOperator?)
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks and Regards,
> >>>>>>>>>>> Vishnu Viswanath,
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha
Krettek <
> >>>>>>> aljoscha@apache.org
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I did:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@mail.gmail.com%3e
> >>>>>>>>>>>> ;-)
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi
<uce@apache.org>
> >>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM,
Aljoscha Krettek <
> >>>>>>>> aljoscha@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>> In the future, it might be good
to to discussions
> >>>> directly on
> >>>>>> the
> >>>>>>>> ML
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> then change the document accordingly.
This way everyone
> >>>> can
> >>>>>>> follow
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> discussion on the ML. I also
feel that Google Doc comments
> >>>>>> often
> >>>>>>>>>>> don't
> >>>>>>>>>>>>> give
> >>>>>>>>>>>>>> enough space for expressing
more complex opinions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree! Would you mind raising
this point as a separate
> >>>>>>> discussion
> >>>>>>>> on
> >>>>>>>>>>>> dev@
> >>>>>>>>>>>>> ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message