flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Mon, 25 Jul 2016 12:33:10 GMT
Hi Aljoscha,

This was exactly one of the problems I also found.

The way I was thinking about it is the following:

Conceptually, time (event and processing) advances but state is a 
fixed property of the window.

Given this, I modified the Count trigger to also ask for the 
current state (count) of the window in all method (e.g. onEventTime and
onProcessingTime). This way the trigger can be composed and play 
well with the other triggers.

If you have any more ideas on that and the rest of the problems I
sent in the previous email, please let me know.

Kostas

> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
> These are some very interesting thoughts! I have some more, based on these:
> 
> What happens if you have for example this Trigger:
> All(Watermark.pastEndOfWindow(), Count.atLeast(10))
> 
> When would this even fire, i.e. what are the steps that lead to this
> combined trigger firing with the Trigger system that we currently have in
> place?
> 
> I have some thoughts but they are not compatible with the way we currently
> handle triggers. I have to think some more, but please shoot if you have
> any ideas.
> 
> Cheers,
> Aljoscha
> 
> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.kloudas@data-artisans.com>
> wrote:
> 
>> Forgot to say that the signature for the onFire() that I think fits should
>> be:
>> 
>> void onFire(Window window, TriggerContext ctx) throws Exception;
>> 
>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <
>> k.kloudas@data-artisans.com> wrote:
>>> 
>>> Hi,
>>> 
>>> I started working on the new triggers proposed here and so far I can see
>>> two shortcomings in the current state of the triggers that do not play
>> well
>>> with the new proposals, and more specifically the composite triggers All
>>> and Any.
>>> 
>>> So here it goes:
>>> 
>>> 1) In the document posted above, there are some new useful trigger
>> combinations (like Any and All) which allow
>>> for combining primitive triggers. This decouples the TriggerResult of
>> each individual trigger from the action that
>>> is actually going to be executed. For example, an All trigger may have
>> one proposing FIRE while the other
>>> CONTINUE and the final result will be CONTINUE.
>>> 
>>> In this case, any action that  should be taken by each individual
>> trigger upon firing, e.g. cleaning its state as in the
>>> case of CountTrigger, should be postponed until the parent trigger (All)
>> decides to fire.
>>> 
>>> For this, there should be a onFire() method in each trigger that does
>> exactly that. This method should be called in the
>>> fireOrCleanup() of the windowOperator, when the firing is successful.
>>> 
>>> 2) In the current implementation, when stateful triggers, like the
>> CountTrigger, are combined in a composite Trigger
>>> (with Any or All) their state is shared because the stateHandle is the
>> same for both. To solve this, the handle should
>>> become unique, BUT consistent for the same Trigger. The latter implies
>> that the handle for the same trigger after
>>> a node failure, should be the same as that of its predecessor (before
>> the failure).
>>> 
>>> Let me know your thoughts on these.
>>> 
>>> Kostas
>>> 
>>> 
>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>>> 
>>>> I'm proposing to get this small change into 1.1:
>>>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our
>> lives
>>>> easier with the future proposed changes.
>>>> 
>>>> What do you think?
>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>>> 
>>>>> Hi,
>>>>> these new features should make it into the 1.2 release. We are already
>>>>> working on releasing 1.1 so it won't make it for that one.
>> unfortunately.
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnchen@gmail.com> wrote:
>>>>> 
>>>>>> BTW, do you have rough timeline in term of roll out it to production?
>>>>>> 
>>>>>> Thanks,
>>>>>> Chen
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <
>> aljoscha@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> Chen commented this on the doc (I'm mirroring here so everyone
can
>>>>>> follow):
>>>>>>> "It would be cool to be able to access last snapshot of window
states
>>>>>>> before it get purged. Pipeline author might consider put it to
>> external
>>>>>>> storage and deal with late arriving events by restore corresponding
>>>>>>> window."
>>>>>>> 
>>>>>>> My answer:
>>>>>>> This is partially covered by the section called "What Happens
at
>>>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want
to
>>>>>> introduce
>>>>>>> is that the window can have one final emission if there is new
data
>> in
>>>>>> the
>>>>>>> buffers at cleanup time.
>>>>>>> 
>>>>>>> The work on this will also depend on this proposal:
>>>>>>> 
>>>>>>> 
>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>>>>>>> With
>>>>>>> this, the WindowFunction can get meta data about the window firing
>> so it
>>>>>>> could be informed that this is the last firing before a cleanup
and
>> that
>>>>>>> there already was an earlier, on-time firing.
>>>>>>> 
>>>>>>> Does this cover your concerns, Chen?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnchen@gmail.com>
wrote:
>>>>>>> 
>>>>>>>> Sure. Currently, it looks like any element assigned to a
too late
>>>>>> window
>>>>>>>> will be dropped silently😓 ?
>>>>>>>> 
>>>>>>>> Having a late window stream imply somehow Flink needs to
add one
>> more
>>>>>>> state
>>>>>>>> to window and split window state cleanup from window retirement.
>>>>>>>> I would suggest as simple as adding a function in trigger
called
>>>>>>>> OnLateElement and always fire_purge it would enable developer
aware
>>>>>> and
>>>>>>>> handle this case.
>>>>>>>> 
>>>>>>>> Chen
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <
>> aljoscha@apache.org
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> @Chen I added a section at the end of the document regarding
access
>>>>>> to
>>>>>>>> the
>>>>>>>>> elements that are dropped as late. Right now, the section
just
>>>>>> mentions
>>>>>>>>> that we have to do this but there is no real proposal
yet for how
>>>>>> to do
>>>>>>>> it.
>>>>>>>>> Only a rough sketch so that we don't forget about it.
>>>>>>>>> 
>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnchen@gmail.com>
wrote:
>>>>>>>>> 
>>>>>>>>>> +1 for allowedLateness scenario.
>>>>>>>>>> 
>>>>>>>>>> The rationale behind is there are backfills or data
issues hold
>>>>>>>> in-window
>>>>>>>>>> data till watermark pass end time. It cause sink
write partial
>>>>>>> output.
>>>>>>>>>> 
>>>>>>>>>> Allow high allowedLateness threshold makes life easier
to merge
>>>>>> those
>>>>>>>>>> results and overwrite partial output with correct
output at sink.
>>>>>> But
>>>>>>>>> yeah,
>>>>>>>>>> pipeline author is at risk of blow up statebackend
with huge
>>>>>> states.
>>>>>>>>>> 
>>>>>>>>>> Alternatively, In some case, if sink allows read-check-merge
>>>>>>> operation,
>>>>>>>>>> window can explicit call out events ingested after
>>>>>> allowedLateness.
>>>>>>> It
>>>>>>>>> asks
>>>>>>>>>> pipeline author mitigated these events in a way outside
of flink
>>>>>>>>> ecosystem.
>>>>>>>>>> The catch is that since everywhere in a flink job
can replay and
>>>>>>>>>> checkpoint, notification should somehow includes
these info as
>>>>>> well.
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> Chen
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
>>>>>>>>>> k.kloudas@data-artisans.com
>>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> In the effort to move the discussion to the mailing
list, rather
>>>>>>> than
>>>>>>>>> the
>>>>>>>>>>> doc,
>>>>>>>>>>> there was a comment in the doc:
>>>>>>>>>>> 
>>>>>>>>>>> “It seems this proposal marries the allowed
lateness of events
>>>>>> and
>>>>>>>> the
>>>>>>>>>>> discarding of window state. In most use cases
this should be
>>>>>>>>> sufficient,
>>>>>>>>>>> but there are instances where having independent
control of
>>>>>> these
>>>>>>> may
>>>>>>>>> be
>>>>>>>>>>> useful.
>>>>>>>>>>> 
>>>>>>>>>>> For instance, you may have a job that computes
some aggregate,
>>>>>>> like a
>>>>>>>>>> sum.
>>>>>>>>>>> You may want to keep the window state around
for a while, but
>>>>>> not
>>>>>>> too
>>>>>>>>>> long.
>>>>>>>>>>> Yet you may want to continue processing late
events after you
>>>>>>>> discarded
>>>>>>>>>> the
>>>>>>>>>>> window state. It is possible that your stream
sinks can make
>>>>>> use of
>>>>>>>>> this
>>>>>>>>>>> data. For instance, they may be writing to a
data store that
>>>>>>> returns
>>>>>>>> an
>>>>>>>>>>> error if a row already exists, which allow the
sink to read the
>>>>>>>>> existing
>>>>>>>>>>> row and update it with the new data."
>>>>>>>>>>> 
>>>>>>>>>>> To which I would like to reply:
>>>>>>>>>>> 
>>>>>>>>>>> If I understand your use-case correctly, I believe
that the
>>>>>>> proposed
>>>>>>>>>>> binding of the allowed lateness to the state
purging does not
>>>>>>> impose
>>>>>>>>> any
>>>>>>>>>>> problem. The lateness specifies the upper time
bound, after
>>>>>> which
>>>>>>> the
>>>>>>>>>> state
>>>>>>>>>>> will be discarded. Between the start of a window
and its (end +
>>>>>>>>>>> allowedLateness) you can write custom triggers
that fire, purge
>>>>>> the
>>>>>>>>>> state,
>>>>>>>>>>> or do nothing. Given this, I suppose that, at
the most extreme
>>>>>>> case,
>>>>>>>>> you
>>>>>>>>>>> can specify an allowed lateness of Long.MaxValue
and do the
>>>>>> purging
>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>> state "manually". By doing this, you remove the
safeguard of
>>>>>>> letting
>>>>>>>>> the
>>>>>>>>>>> system purge the state at some point in time,
and you can do
>>>>>> your
>>>>>>> own
>>>>>>>>>>> custom state management that fits your needs.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kostas
>>>>>>>>>>> 
>>>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek
<
>>>>>>> aljoscha@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> @Vishnu Funny you should ask that because
I have a design doc
>>>>>>> lying
>>>>>>>>>>> around.
>>>>>>>>>>>> I'll open a new mail thread to not hijack
this one.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath
<
>>>>>>>>>>> vishnu.viswanath25@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I was going through the suggested improvements
in window,
>>>>>> and I
>>>>>>>> have
>>>>>>>>>>>>> few questions/suggestion on improvement
regarding the
>>>>>> Evictor.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1) I am having a use case where I have
to create a custom
>>>>>>> Evictor
>>>>>>>>> that
>>>>>>>>>>> will
>>>>>>>>>>>>> evict elements from the window based
on the value (e.g., if I
>>>>>>> have
>>>>>>>>>>> elements
>>>>>>>>>>>>> are of case class Item(id: Int, type:String)
then evict
>>>>>> elements
>>>>>>>>> that
>>>>>>>>>>> has
>>>>>>>>>>>>> type="a"). I believe this is not currently
possible.
>>>>>>>>>>>>> 2) this is somewhat related to 1) where
there should be an
>>>>>>> option
>>>>>>>> to
>>>>>>>>>>> evict
>>>>>>>>>>>>> elements from anywhere in the window.
not only from the
>>>>>>> beginning
>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>> window. (e.g., apply the delta function
to all elements and
>>>>>>> remove
>>>>>>>>> all
>>>>>>>>>>>>> those don't pass. I checked the code
and evict method just
>>>>>>> returns
>>>>>>>>> the
>>>>>>>>>>>>> number of elements to be removed and
processTriggerResult
>>>>>> just
>>>>>>>> skips
>>>>>>>>>>> those
>>>>>>>>>>>>> many elements from the beginning.
>>>>>>>>>>>>> 3) Add an option to enables the user
to decide if the
>>>>>> eviction
>>>>>>>>> should
>>>>>>>>>>>>> happen before the apply function or after
the apply function.
>>>>>>>>>> Currently
>>>>>>>>>>> it
>>>>>>>>>>>>> is before the apply function, but I have
a use case where I
>>>>>> need
>>>>>>>> to
>>>>>>>>>>> first
>>>>>>>>>>>>> apply the function and evict afterward.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am doing these for a POC so I think
I can modify the flink
>>>>>>> code
>>>>>>>>> base
>>>>>>>>>>> to
>>>>>>>>>>>>> make these changes and build, but I would
appreciate any
>>>>>>>> suggestion
>>>>>>>>> on
>>>>>>>>>>>>> whether these are viable changes or will
there any
>>>>>> performance
>>>>>>>> issue
>>>>>>>>>> if
>>>>>>>>>>>>> these are done. Also any pointer on where
to start(e.g, do I
>>>>>>>> create
>>>>>>>>> a
>>>>>>>>>>> new
>>>>>>>>>>>>> class similar to EvictingWindowOperator
that extends
>>>>>>>>> WindowOperator?)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>>> Vishnu Viswanath,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha
Krettek <
>>>>>>>>> aljoscha@apache.org
>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I did:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@mail.gmail.com%3e
>>>>>>>>>>>>>> ;-)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk
Celebi <uce@apache.org>
>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM,
Aljoscha Krettek <
>>>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> In the future, it might be
good to to discussions
>>>>>> directly on
>>>>>>>> the
>>>>>>>>>> ML
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> then change the document
accordingly. This way everyone
>>>>>> can
>>>>>>>>> follow
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> discussion on the ML. I also
feel that Google Doc comments
>>>>>>>> often
>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>> give
>>>>>>>>>>>>>>>> enough space for expressing
more complex opinions.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I agree! Would you mind raising
this point as a separate
>>>>>>>>> discussion
>>>>>>>>>> on
>>>>>>>>>>>>>> dev@
>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 


Mime
View raw message