flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Fri, 22 Jul 2016 11:10:28 GMT
Forgot to say that the signature for the onFire() that I think fits should be:

void onFire(Window window, TriggerContext ctx) throws Exception;

> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
> 
> Hi,
> 
> I started working on the new triggers proposed here and so far I can see 
> two shortcomings in the current state of the triggers that do not play well 
> with the new proposals, and more specifically the composite triggers All 
> and Any. 
> 
> So here it goes:
> 
> 1) In the document posted above, there are some new useful trigger combinations (like
Any and All) which allow 
> for combining primitive triggers. This decouples the TriggerResult of each individual
trigger from the action that
> is actually going to be executed. For example, an All trigger may have one proposing
FIRE while the other 
> CONTINUE and the final result will be CONTINUE.
> 
> In this case, any action that  should be taken by each individual trigger upon firing,
e.g. cleaning its state as in the 
> case of CountTrigger, should be postponed until the parent trigger (All) decides to fire.
> 
> For this, there should be a onFire() method in each trigger that does exactly that. This
method should be called in the
> fireOrCleanup() of the windowOperator, when the firing is successful.
> 
> 2) In the current implementation, when stateful triggers, like the CountTrigger, are
combined in a composite Trigger 
> (with Any or All) their state is shared because the stateHandle is the same for both.
To solve this, the handle should 
> become unique, BUT consistent for the same Trigger. The latter implies that the handle
for the same trigger after
> a node failure, should be the same as that of its predecessor (before the failure).
> 
> Let me know your thoughts on these.
> 
> Kostas
> 
> 
>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
>> 
>> I'm proposing to get this small change into 1.1:
>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our lives
>> easier with the future proposed changes.
>> 
>> What do you think?
>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljoscha@apache.org> wrote:
>> 
>>> Hi,
>>> these new features should make it into the 1.2 release. We are already
>>> working on releasing 1.1 so it won't make it for that one. unfortunately.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnchen@gmail.com> wrote:
>>> 
>>>> BTW, do you have rough timeline in term of roll out it to production?
>>>> 
>>>> Thanks,
>>>> Chen
>>>> 
>>>> 
>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> Chen commented this on the doc (I'm mirroring here so everyone can
>>>> follow):
>>>>> "It would be cool to be able to access last snapshot of window states
>>>>> before it get purged. Pipeline author might consider put it to external
>>>>> storage and deal with late arriving events by restore corresponding
>>>>> window."
>>>>> 
>>>>> My answer:
>>>>> This is partially covered by the section called "What Happens at
>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to
>>>> introduce
>>>>> is that the window can have one final emission if there is new data in
>>>> the
>>>>> buffers at cleanup time.
>>>>> 
>>>>> The work on this will also depend on this proposal:
>>>>> 
>>>>> 
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>>>>> With
>>>>> this, the WindowFunction can get meta data about the window firing so
it
>>>>> could be informed that this is the last firing before a cleanup and that
>>>>> there already was an earlier, on-time firing.
>>>>> 
>>>>> Does this cover your concerns, Chen?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnchen@gmail.com> wrote:
>>>>> 
>>>>>> Sure. Currently, it looks like any element assigned to a too late
>>>> window
>>>>>> will be dropped silently😓 ?
>>>>>> 
>>>>>> Having a late window stream imply somehow Flink needs to add one
more
>>>>> state
>>>>>> to window and split window state cleanup from window retirement.
>>>>>> I would suggest as simple as adding a function in trigger called
>>>>>> OnLateElement and always fire_purge it would enable developer aware
>>>> and
>>>>>> handle this case.
>>>>>> 
>>>>>> Chen
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> @Chen I added a section at the end of the document regarding
access
>>>> to
>>>>>> the
>>>>>>> elements that are dropped as late. Right now, the section just
>>>> mentions
>>>>>>> that we have to do this but there is no real proposal yet for
how
>>>> to do
>>>>>> it.
>>>>>>> Only a rough sketch so that we don't forget about it.
>>>>>>> 
>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnchen@gmail.com>
wrote:
>>>>>>> 
>>>>>>>> +1 for allowedLateness scenario.
>>>>>>>> 
>>>>>>>> The rationale behind is there are backfills or data issues
hold
>>>>>> in-window
>>>>>>>> data till watermark pass end time. It cause sink write partial
>>>>> output.
>>>>>>>> 
>>>>>>>> Allow high allowedLateness threshold makes life easier to
merge
>>>> those
>>>>>>>> results and overwrite partial output with correct output
at sink.
>>>> But
>>>>>>> yeah,
>>>>>>>> pipeline author is at risk of blow up statebackend with huge
>>>> states.
>>>>>>>> 
>>>>>>>> Alternatively, In some case, if sink allows read-check-merge
>>>>> operation,
>>>>>>>> window can explicit call out events ingested after
>>>> allowedLateness.
>>>>> It
>>>>>>> asks
>>>>>>>> pipeline author mitigated these events in a way outside of
flink
>>>>>>> ecosystem.
>>>>>>>> The catch is that since everywhere in a flink job can replay
and
>>>>>>>> checkpoint, notification should somehow includes these info
as
>>>> well.
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Chen
>>>>>>>> 
>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> In the effort to move the discussion to the mailing list,
rather
>>>>> than
>>>>>>> the
>>>>>>>>> doc,
>>>>>>>>> there was a comment in the doc:
>>>>>>>>> 
>>>>>>>>> “It seems this proposal marries the allowed lateness
of events
>>>> and
>>>>>> the
>>>>>>>>> discarding of window state. In most use cases this should
be
>>>>>>> sufficient,
>>>>>>>>> but there are instances where having independent control
of
>>>> these
>>>>> may
>>>>>>> be
>>>>>>>>> useful.
>>>>>>>>> 
>>>>>>>>> For instance, you may have a job that computes some aggregate,
>>>>> like a
>>>>>>>> sum.
>>>>>>>>> You may want to keep the window state around for a while,
but
>>>> not
>>>>> too
>>>>>>>> long.
>>>>>>>>> Yet you may want to continue processing late events after
you
>>>>>> discarded
>>>>>>>> the
>>>>>>>>> window state. It is possible that your stream sinks can
make
>>>> use of
>>>>>>> this
>>>>>>>>> data. For instance, they may be writing to a data store
that
>>>>> returns
>>>>>> an
>>>>>>>>> error if a row already exists, which allow the sink to
read the
>>>>>>> existing
>>>>>>>>> row and update it with the new data."
>>>>>>>>> 
>>>>>>>>> To which I would like to reply:
>>>>>>>>> 
>>>>>>>>> If I understand your use-case correctly, I believe that
the
>>>>> proposed
>>>>>>>>> binding of the allowed lateness to the state purging
does not
>>>>> impose
>>>>>>> any
>>>>>>>>> problem. The lateness specifies the upper time bound,
after
>>>> which
>>>>> the
>>>>>>>> state
>>>>>>>>> will be discarded. Between the start of a window and
its (end +
>>>>>>>>> allowedLateness) you can write custom triggers that fire,
purge
>>>> the
>>>>>>>> state,
>>>>>>>>> or do nothing. Given this, I suppose that, at the most
extreme
>>>>> case,
>>>>>>> you
>>>>>>>>> can specify an allowed lateness of Long.MaxValue and
do the
>>>> purging
>>>>>> of
>>>>>>>> the
>>>>>>>>> state "manually". By doing this, you remove the safeguard
of
>>>>> letting
>>>>>>> the
>>>>>>>>> system purge the state at some point in time, and you
can do
>>>> your
>>>>> own
>>>>>>>>> custom state management that fits your needs.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Kostas
>>>>>>>>> 
>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
>>>>> aljoscha@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> @Vishnu Funny you should ask that because I have
a design doc
>>>>> lying
>>>>>>>>> around.
>>>>>>>>>> I'll open a new mail thread to not hijack this one.
>>>>>>>>>> 
>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
>>>>>>>>> vishnu.viswanath25@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> I was going through the suggested improvements
in window,
>>>> and I
>>>>>> have
>>>>>>>>>>> few questions/suggestion on improvement regarding
the
>>>> Evictor.
>>>>>>>>>>> 
>>>>>>>>>>> 1) I am having a use case where I have to create
a custom
>>>>> Evictor
>>>>>>> that
>>>>>>>>> will
>>>>>>>>>>> evict elements from the window based on the value
(e.g., if I
>>>>> have
>>>>>>>>> elements
>>>>>>>>>>> are of case class Item(id: Int, type:String)
then evict
>>>> elements
>>>>>>> that
>>>>>>>>> has
>>>>>>>>>>> type="a"). I believe this is not currently possible.
>>>>>>>>>>> 2) this is somewhat related to 1) where there
should be an
>>>>> option
>>>>>> to
>>>>>>>>> evict
>>>>>>>>>>> elements from anywhere in the window. not only
from the
>>>>> beginning
>>>>>> of
>>>>>>>> the
>>>>>>>>>>> window. (e.g., apply the delta function to all
elements and
>>>>> remove
>>>>>>> all
>>>>>>>>>>> those don't pass. I checked the code and evict
method just
>>>>> returns
>>>>>>> the
>>>>>>>>>>> number of elements to be removed and processTriggerResult
>>>> just
>>>>>> skips
>>>>>>>>> those
>>>>>>>>>>> many elements from the beginning.
>>>>>>>>>>> 3) Add an option to enables the user to decide
if the
>>>> eviction
>>>>>>> should
>>>>>>>>>>> happen before the apply function or after the
apply function.
>>>>>>>> Currently
>>>>>>>>> it
>>>>>>>>>>> is before the apply function, but I have a use
case where I
>>>> need
>>>>>> to
>>>>>>>>> first
>>>>>>>>>>> apply the function and evict afterward.
>>>>>>>>>>> 
>>>>>>>>>>> I am doing these for a POC so I think I can modify
the flink
>>>>> code
>>>>>>> base
>>>>>>>>> to
>>>>>>>>>>> make these changes and build, but I would appreciate
any
>>>>>> suggestion
>>>>>>> on
>>>>>>>>>>> whether these are viable changes or will there
any
>>>> performance
>>>>>> issue
>>>>>>>> if
>>>>>>>>>>> these are done. Also any pointer on where to
start(e.g, do I
>>>>>> create
>>>>>>> a
>>>>>>>>> new
>>>>>>>>>>> class similar to EvictingWindowOperator that
extends
>>>>>>> WindowOperator?)
>>>>>>>>>>> 
>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>> Vishnu Viswanath,
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek
<
>>>>>>> aljoscha@apache.org
>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I did:
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@mail.gmail.com%3e
>>>>>>>>>>>> ;-)
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <uce@apache.org>
>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha
Krettek <
>>>>>>>> aljoscha@apache.org
>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> In the future, it might be good to
to discussions
>>>> directly on
>>>>>> the
>>>>>>>> ML
>>>>>>>>>>>> and
>>>>>>>>>>>>>> then change the document accordingly.
This way everyone
>>>> can
>>>>>>> follow
>>>>>>>>>>> the
>>>>>>>>>>>>>> discussion on the ML. I also feel
that Google Doc comments
>>>>>> often
>>>>>>>>>>> don't
>>>>>>>>>>>>> give
>>>>>>>>>>>>>> enough space for expressing more
complex opinions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree! Would you mind raising this
point as a separate
>>>>>>> discussion
>>>>>>>> on
>>>>>>>>>>>> dev@
>>>>>>>>>>>>> ?
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
> 


Mime
View raw message