flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Enhance Window Evictor in Flink
Date Mon, 18 Jul 2016 10:32:48 GMT
Hi,
about TimeEvictor, yes, I think there should be specific evictors for
processing time and event time. Also, the current time should be
retrievable from the EvictorContext.

For the wiki you will need permissions. This was recently changed because
there was too much spam. I gave you permission to add pages. Can you please
try and check if it works?

Cheers,
Aljoscha

On Fri, 15 Jul 2016 at 13:28 Vishnu Viswanath <vishnu.viswanath25@gmail.com>
wrote:

> Hi all,
>
> How do we create a FLIP page, is there any permission setup required? I
> don't see any "Create" page(after logging in) option in the header as
> mentioned in
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
>
> Thanks,
> Vishnu
>
> On Wed, Jul 13, 2016 at 10:22 PM, Vishnu Viswanath <
> vishnu.viswanath25@gmail.com> wrote:
>
> > Hi Aljoscha,
> >
> > I agree, the user will know exactly that they are creating an EventTime
> > based evictor or ProcessingTime based evictor looking at the code.
> > So do you think it will be ok to have multiple versions of TimeEvictor
> > (one for event time and one for processing time) and also a DeltaEvcitor
> > (again 2 versions- for event time and processing time) ?
> >
> > Please note that the existing behavior of TimeEvictor/DeltaEvictor does
> > not consider if it is EventTime or ProcessingTime
> > e.g., in TimeEvictor the current time is considered as the timestamp of
> > the last element in the window
> >
> > *long currentTime = Iterables.getLast(elements).getTimestamp();*
> >
> > not the highest timestamp of all elements
> > what I am trying to achieve is something like:
> >
> > *long currentTime;*
> > * if (ctx.isEventTime()) {*
> > * currentTime = getMaxTimestamp(elements);*
> > * } else {*
> > * currentTime = Iterables.getLast(elements).getTimestamp();*
> > * }*
> >
> > Similarly, in DeltaEvictor the *`lastElement`* is
> > *`Iterables.getLast(elements);`* and I am thinking we should consider the
> > element with max timestamp as the last element instead of just getting
> the
> > last inserted element as *`lastElement`*
> >
> > Do you think it is the right thing to do or leave the behavior Evictors
> as
> > is, w.r.t to choosing the last element?
> >
> > Thanks,
> > Vishnu
> >
> > On Wed, Jul 13, 2016 at 11:07 AM, Aljoscha Krettek <aljoscha@apache.org>
> > wrote:
> >
> >> I still think it should be explicit in the class. For example, if you
> have
> >> this code:
> >>
> >> input
> >>   .keyBy()
> >>   .window()
> >>   .trigger(EventTimeTrigger.create())
> >>   .evictor(TimeTrigger.create())
> >>
> >> the time behavior of the trigger is explicitly specified while the
> evictor
> >> would dynamically adapt based on internal workings that the user might
> not
> >> be aware of. Having the behavior explicit at the call site is very
> >> important, in my opinion.
> >>
> >> On Wed, 13 Jul 2016 at 16:28 Vishnu Viswanath <
> >> vishnu.viswanath25@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I was hoping to use the isEventTime method in the WindowAssigner to
> set
> >> > that information in the EvictorContext.
> >> > What do you think?.
> >> >
> >> > Thanks and Regards,
> >> > Vishnu Viswanath,
> >> >
> >> > On Wed, Jul 13, 2016 at 10:09 AM, Aljoscha Krettek <
> aljoscha@apache.org
> >> >
> >> > wrote:
> >> >
> >> > > Hi,
> >> > > I think the way to go here is to add both an EventTimeEvictor and a
> >> > > ProcessingTimeEvictor. The problem is that "isEventTime" cannot
> >> really be
> >> > > determined. That's also the reason why there is an EventTimeTrigger
> >> and a
> >> > > ProcessingTimeTrigger. It was just an oversight that the TimeEvictor
> >> does
> >> > > not also have these two versions.
> >> > >
> >> > > About EvictingWindowOperator, I think you can make the two methods
> >> > > non-final in WindowOperator, yes.
> >> > >
> >> > > Cheers,
> >> > > Aljoscha
> >> > >
> >> > > On Wed, 13 Jul 2016 at 14:32 Vishnu Viswanath <
> >> > > vishnu.viswanath25@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Hi Aljoscha,
> >> > > >
> >> > > > I am thinking of adding a method boolean isEventTime(); in the
> >> > > > EvictorContext apart from
> >> > > >
> >> > > > long getCurrentProcessingTime();
> >> > > > MetricGroup getMetricGroup();
> >> > > > long getCurrentWatermark();
> >> > > >
> >> > > > This method can be used to make the Evictor not iterate through
> all
> >> the
> >> > > > elements in TimeEvictor. There will be a few changes in the
> existing
> >> > > > behavior of TimeEvictor and DeltaEvictor (I have mentioned this in
> >> the
> >> > > > design doc)
> >> > > >
> >> > > > Also, is there any specific reason why the open and close method
> in
> >> > > > WindowEvictor is made final? Since the EvictorContext will be in
> the
> >> > > > EvictingWindowOperator, I need to override the open and close in
> >> > > > EvitingWindowOperator to make the reference of EvictorContext
> null.
> >> > > >
> >> > > > Thanks and Regards,
> >> > > > Vishnu Viswanath,
> >> > > >
> >> > > > On Fri, Jul 8, 2016 at 7:40 PM, Vishnu Viswanath <
> >> > > > vishnu.viswanath25@gmail.com> wrote:
> >> > > >
> >> > > > My thought process when asking if we can use state backend in
> window
> >> > > > > function was : can we add the elements to be evicted into some
> >> state
> >> > > and
> >> > > > > allow the evictAfter to read it from some context and remove it
> >> from
> >> > > the
> >> > > > > window?
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Jul 8, 2016 at 7:30 PM, Vishnu Viswanath <
> >> > > > > vishnu.viswanath25@gmail.com> wrote:
> >> > > > >
> >> > > > >> Hi Aljoscha,
> >> > > > >>
> >> > > > >> Thanks for the explanation, and sorry for late reply was busy
> >> with
> >> > > work.
> >> > > > >>
> >> > > > >> I did think about this scenario, in fact in my previous mail I
> >> > thought
> >> > > > of
> >> > > > >> posting this question, then I understood that this problem will
> >> be
> >> > > > >> there which ever method we choose(Trigger looking for pattern
> or
> >> > > Window
> >> > > > >> looking for pattern).
> >> > > > >>
> >> > > > >> I do have a pretty good watermark but my concern is that it
> >> changes
> >> > > > based
> >> > > > >> on the key of these messages(I don't know if it is possible,
> >> haven't
> >> > > > >> started coding that yet. May be you could tell me). Even if it
> is
> >> > yes
> >> > > > some
> >> > > > >> of these watermarks will be long(in days), which I don't want
> the
> >> > > > trigger
> >> > > > >> to wait that long.
> >> > > > >>
> >> > > > >> It looks like it is not easy to have an evictAfter based on
> >> window
> >> > > > >> function(without introducing coupling), but can the current
> >> window
> >> > > apply
> >> > > > >> function be modified to allow it to change the elements in it -
> >> may
> >> > be
> >> > > > >> using some state backend(I don't know how excatly the internals
> >> of
> >> > > these
> >> > > > >> work, so this might be a wrong question)
> >> > > > >>
> >> > > > >> Thanks and Regards,
> >> > > > >> Vishnu Viswanath,
> >> > > > >>
> >> > > > >> On Fri, Jul 8, 2016 at 10:20 AM, Aljoscha Krettek <
> >> > > aljoscha@apache.org>
> >> > > > >> wrote:
> >> > > > >>
> >> > > > >>> Hi Vishnu,
> >> > > > >>> how long would these patterns be? The Trigger would not have
> to
> >> > sort
> >> > > > the
> >> > > > >>> elements for every new element but just insert the new element
> >> into
> >> > > an
> >> > > > >>> internal data structure. Only when it sees that the watermark
> is
> >> > > past a
> >> > > > >>> certain point would it check whether the pattern matches and
> >> > actually
> >> > > > >>> Trigger.
> >> > > > >>>
> >> > > > >>> A general note regarding order and event time: I think relying
> >> on
> >> > > this
> >> > > > >>> for
> >> > > > >>> computation is very tricky unless the watermark is 100 %
> >> correct or
> >> > > you
> >> > > > >>> completely discard elements that arrive after the watermark,
> >> i.e.
> >> > > > >>> elements
> >> > > > >>> that would break the promise of the watermark that no elements
> >> with
> >> > > an
> >> > > > >>> earlier timestamp will ever arrive. The reason for this is
> that
> >> > there
> >> > > > >>> could
> >> > > > >>> always enter new elements that end up between already seen
> >> > elements.
> >> > > > For
> >> > > > >>> example, let's say we have this sequence of elements when the
> >> > trigger
> >> > > > >>> fires:
> >> > > > >>>
> >> > > > >>> a-b-a
> >> > > > >>>
> >> > > > >>> This is the sequence that you are looking for and you emit
> some
> >> > > result
> >> > > > >>> from
> >> > > > >>> the WindowFunction. Now, new elements arrive that fall in
> >> between
> >> > the
> >> > > > >>> elements we already have:
> >> > > > >>>
> >> > > > >>> a-d-e-b-f-g-a
> >> > > > >>>
> >> > > > >>> This is an updated, sorted view of the actual event-time
> stream
> >> and
> >> > > we
> >> > > > >>> didn't realize that the stream actually looks like this
> before.
> >> > Does
> >> > > > this
> >> > > > >>> still match the original pattern or should we now consider
> this
> >> as
> >> > > > >>> non-matching? If no, then the earlier successful match for
> a-b-a
> >> > was
> >> > > > >>> wrong
> >> > > > >>> and we should never have processed it but we didn't know at
> the
> >> > time.
> >> > > > If
> >> > > > >>> yes, then pattern matching like this can be done in the
> Trigger
> >> by
> >> > > > having
> >> > > > >>> something like pattern slots: You don't have to store all
> >> elements
> >> > in
> >> > > > the
> >> > > > >>> Trigger, you just need to store possible candidates that could
> >> > match
> >> > > > the
> >> > > > >>> pattern and ignore the other (in-between) elements.
> >> > > > >>>
> >> > > > >>> Cheers,
> >> > > > >>> Aljoscha
> >> > > > >>>
> >> > > > >>> On Fri, 8 Jul 2016 at 14:10 Vishnu Viswanath <
> >> > > > >>> vishnu.viswanath25@gmail.com>
> >> > > > >>> wrote:
> >> > > > >>>
> >> > > > >>> > Hi Aljoscha,
> >> > > > >>> >
> >> > > > >>> > That is a good idea, trying to tie it back to the use case,
> >> > > > >>> > e.g., suppose trigger is looking for a pattern, a-b-a and
> >> when it
> >> > > > sees
> >> > > > >>> such
> >> > > > >>> > a pattern, it will trigger the window and it knows that now
> >> the
> >> > > > >>> Evictor is
> >> > > > >>> > going to evict the element b, and trigger updates its state
> as
> >> > a-a
> >> > > > >>> (even
> >> > > > >>> > before the window & evictor completes) and will be looking
> for
> >> > the
> >> > > > >>> rest of
> >> > > > >>> > the pattern i.e., b-a. But I can think of 1 problem here,
> >> > > > >>> >
> >> > > > >>> >    - the events can arrive out of order, i.e., the trigger
> >> might
> >> > be
> >> > > > >>> seeing
> >> > > > >>> >    a pattern a-a-b but actual event time is a-b-a then
> trigger
> >> > will
> >> > > > >>> have to
> >> > > > >>> >    sort the elements in the window everytime it sees an
> >> element.
> >> > (I
> >> > > > was
> >> > > > >>> >    planning to do this sorting in the window, which will be
> >> less
> >> > > > often
> >> > > > >>> -
> >> > > > >>> > only
> >> > > > >>> >    when the trigger fires)
> >> > > > >>> >
> >> > > > >>> > Thanks and Regards,
> >> > > > >>> > Vishnu Viswanath,
> >> > > > >>> >
> >> > > > >>> > On Fri, Jul 8, 2016 at 6:04 AM, Aljoscha Krettek <
> >> > > > aljoscha@apache.org>
> >> > > > >>> > wrote:
> >> > > > >>> >
> >> > > > >>> > Hi,
> >> > > > >>> > > come to think of it, the right place to put such checks is
> >> > > actually
> >> > > > >>> the
> >> > > > >>> > > Trigger. It would have to be a custom trigger that
> observes
> >> > time
> >> > > > but
> >> > > > >>> also
> >> > > > >>> > > keeps some internal state machine to decide when it has
> >> > observed
> >> > > > the
> >> > > > >>> > right
> >> > > > >>> > > pattern in the window. Then the window function would just
> >> have
> >> > > to
> >> > > > >>> do the
> >> > > > >>> > > processing and you have good separation of concerns. Does
> >> that
> >> > > make
> >> > > > >>> > sense?
> >> > > > >>> > >
> >> > > > >>> > > I'm ignoring time and sorting by time for now because we
> >> > probably
> >> > > > >>> need
> >> > > > >>> > > another design document for that. To me it seems like a
> >> bigger
> >> > > > thing.
> >> > > > >>> > >
> >> > > > >>> > > Cheers,
> >> > > > >>> > > Aljoscha
> >> > > > >>> > >
> >> > > > >>> > > On Thu, 7 Jul 2016 at 23:56 Vishnu Viswanath <
> >> > > > >>> > vishnu.viswanath25@gmail.com
> >> > > > >>> > > >
> >> > > > >>> > > wrote:
> >> > > > >>> > >
> >> > > > >>> > > > Hi,
> >> > > > >>> > > >
> >> > > > >>> > > > Regarding the evictAfter function, that evicts based on
> >> some
> >> > > > >>> decision
> >> > > > >>> > > made
> >> > > > >>> > > > by the window function:  I think it will be nice if we
> can
> >> > come
> >> > > > up
> >> > > > >>> with
> >> > > > >>> > > > something that is LESS coupled, because I can think of
> >> > several
> >> > > > use
> >> > > > >>> > cases
> >> > > > >>> > > > that depend on it.
> >> > > > >>> > > >
> >> > > > >>> > > > Especially in the case where there are late arriving
> >> > messages.
> >> > > > Only
> >> > > > >>> > after
> >> > > > >>> > > > the window function is applied we could tell what to do
> >> with
> >> > > the
> >> > > > >>> > elements
> >> > > > >>> > > > in the window. You could apply your business logic there
> >> to
> >> > > > >>> determine
> >> > > > >>> > if
> >> > > > >>> > > > the window funciton was able to do what it is supposed
> to
> >> do,
> >> > > if
> >> > > > >>> yes
> >> > > > >>> > > evict
> >> > > > >>> > > > those elements, else(since the elements you are looking
> >> for
> >> > > > haven't
> >> > > > >>> > > arrived
> >> > > > >>> > > > yet) wait and try again when the trigger gets fired next
> >> > time.
> >> > > > >>> > > >
> >> > > > >>> > > > Thanks and Regards,
> >> > > > >>> > > > Vishnu Viswanath,
> >> > > > >>> > > >
> >> > > > >>> > > >
> >> > > > >>> > > > On Thu, Jul 7, 2016 at 9:19 AM, Radu Tudoran <
> >> > > > >>> radu.tudoran@huawei.com>
> >> > > > >>> > > > wrote:
> >> > > > >>> > > >
> >> > > > >>> > > > > Hi,
> >> > > > >>> > > > >
> >> > > > >>> > > > > @Aljoscha - I can understand the reason why you are
> >> > hesitant
> >> > > to
> >> > > > >>> > > introduce
> >> > > > >>> > > > > "slower" windows such as the ones that would maintain
> >> > sorted
> >> > > > >>> items or
> >> > > > >>> > > > > windows with bindings between the different entities
> >> > > (evictor,
> >> > > > >>> > trigger,
> >> > > > >>> > > > > window, apply function). However, I think it's
> possible
> >> > just
> >> > > to
> >> > > > >>> > create
> >> > > > >>> > > > more
> >> > > > >>> > > > > types of windows. The existing ones (timewindows,
> global
> >> > > > windows
> >> > > > >>> ...)
> >> > > > >>> > > can
> >> > > > >>> > > > > remain, and just add some more flavors of windows were
> >> more
> >> > > > >>> features
> >> > > > >>> > > are
> >> > > > >>> > > > > enabled or more functionality (e.g., access to the
> each
> >> > > element
> >> > > > >>> in
> >> > > > >>> > the
> >> > > > >>> > > > > evictor ; possibility to delete or mark for eviction
> >> > elements
> >> > > > in
> >> > > > >>> the
> >> > > > >>> > > > > function...)
> >> > > > >>> > > > >
> >> > > > >>> > > > > Regarding the specific case of sorted windows, I think
> >> the
> >> > N
> >> > > > lon
> >> > > > >>> N
> >> > > > >>> > > > > complexity to sort (the worst case) is very unlikely.
> In
> >> > fact
> >> > > > you
> >> > > > >>> > have
> >> > > > >>> > > > > almost sorted items/arrays. Moreover, if you consider
> >> that
> >> > in
> >> > > > >>> > > iteration X
> >> > > > >>> > > > > all elements were sorted, then in iteration X+1 you
> will
> >> > need
> >> > > > to
> >> > > > >>> sort
> >> > > > >>> > > > just
> >> > > > >>> > > > > the newly arrived elements (M). I would expect that
> this
> >> > > > number M
> >> > > > >>> > might
> >> > > > >>> > > > be
> >> > > > >>> > > > > significant smaller then N (elements that exists).
> Then
> >> > using
> >> > > > an
> >> > > > >>> > > > insertion
> >> > > > >>> > > > > sort for these new elements you would have M  * N
> >> > complexity
> >> > > > and
> >> > > > >>> if
> >> > > > >>> > > M<< N
> >> > > > >>> > > > > then the complexity is O(N). Alternatively you can
> use a
> >> > > binary
> >> > > > >>> > search
> >> > > > >>> > > > for
> >> > > > >>> > > > > insertion and then you further reduce the complexity
> to
> >> > > > O(logN).
> >> > > > >>> > > > > If M is proportional to N then you can sort M and use
> >> merge
> >> > > > sort
> >> > > > >>> for
> >> > > > >>> > > > > combining.
> >> > > > >>> > > > >
> >> > > > >>> > > > >
> >> > > > >>> > > > > Dr. Radu Tudoran
> >> > > > >>> > > > > Research Engineer - Big Data Expert
> >> > > > >>> > > > > IT R&D Division
> >> > > > >>> > > > >
> >> > > > >>> > > > >
> >> > > > >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> > > > >>> > > > > European Research Center
> >> > > > >>> > > > > Riesstrasse 25, 80992 München
> >> > > > >>> > > > >
> >> > > > >>> > > > > E-mail: radu.tudoran@huawei.com
> >> > > > >>> > > > > Mobile: +49 15209084330
> >> > > > >>> > > > > Telephone: +49 891588344173
> >> > > > >>> > > > >
> >> > > > >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> > > > >>> > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> >> www.huawei.com
> >> > > > >>> > > > > Registered Office: Düsseldorf, Register Court
> >> Düsseldorf,
> >> > HRB
> >> > > > >>> 56063,
> >> > > > >>> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >> > > > >>> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> >> Düsseldorf,
> >> > > HRB
> >> > > > >>> 56063,
> >> > > > >>> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >> > > > >>> > > > > This e-mail and its attachments contain confidential
> >> > > > information
> >> > > > >>> from
> >> > > > >>> > > > > HUAWEI, which is intended only for the person or
> entity
> >> > whose
> >> > > > >>> address
> >> > > > >>> > > is
> >> > > > >>> > > > > listed above. Any use of the information contained
> >> herein
> >> > in
> >> > > > any
> >> > > > >>> way
> >> > > > >>> > > > > (including, but not limited to, total or partial
> >> > disclosure,
> >> > > > >>> > > > reproduction,
> >> > > > >>> > > > > or dissemination) by persons other than the intended
> >> > > > >>> recipient(s) is
> >> > > > >>> > > > > prohibited. If you receive this e-mail in error,
> please
> >> > > notify
> >> > > > >>> the
> >> > > > >>> > > sender
> >> > > > >>> > > > > by phone or email immediately and delete it!
> >> > > > >>> > > > >
> >> > > > >>> > > > >
> >> > > > >>> > > > > -----Original Message-----
> >> > > > >>> > > > > From: 吕文龙(吕文龙) [mailto:wenlong.lwl@alibaba-inc.com]
> >> > > > >>> > > > > Sent: Thursday, July 07, 2016 11:59 AM
> >> > > > >>> > > > > To: dev@flink.apache.org
> >> > > > >>> > > > > Subject: 答复: [DISCUSS] Enhance Window Evictor in Flink
> >> > > > >>> > > > >
> >> > > > >>> > > > > HI,
> >> > > > >>> > > > > I think it is necessary to support sorted window,
> which
> >> can
> >> > > > avoid
> >> > > > >>> > > > scanning
> >> > > > >>> > > > > all the elements of window while trying to evicting
> >> > element,
> >> > > > >>> which
> >> > > > >>> > may
> >> > > > >>> > > > cost
> >> > > > >>> > > > > many IO operations, such as querying DBs to get
> elements
> >> > from
> >> > > > >>> state.
> >> > > > >>> > > > > What's more, when an window aggregation function is
> >> > > invertible,
> >> > > > >>> such
> >> > > > >>> > as
> >> > > > >>> > > > > sum, which can be updated by adding or removing a
> single
> >> > > > record,
> >> > > > >>> > window
> >> > > > >>> > > > > results can be incrementally calculated. In this kind
> of
> >> > > case,
> >> > > > >>> we can
> >> > > > >>> > > > > dramatically improve the performance of window
> >> aggregation,
> >> > > if
> >> > > > >>> > evictor
> >> > > > >>> > > > can
> >> > > > >>> > > > > trigger update of window aggregation state by some
> >> > mechanism.
> >> > > > >>> > > > >
> >> > > > >>> > > > > Best Wishes!
> >> > > > >>> > > > > ---
> >> > > > >>> > > > > wenlong.lwl
> >> > > > >>> > > > >
> >> > > > >>> > > > > -----邮件原件-----
> >> > > > >>> > > > > 发件人: Aljoscha Krettek [mailto:aljoscha@apache.org]
> >> > > > >>> > > > > 发送时间: 2016年7月7日 17:32
> >> > > > >>> > > > > 收件人: dev@flink.apache.org
> >> > > > >>> > > > > 主题: Re: [DISCUSS] Enhance Window Evictor in Flink
> >> > > > >>> > > > >
> >> > > > >>> > > > > Hi,
> >> > > > >>> > > > > regarding "sorting the window by event time": I also
> >> > > considered
> >> > > > >>> this
> >> > > > >>> > > but
> >> > > > >>> > > > > in the end I don't think it's necessary. Sorting is
> >> rather
> >> > > > >>> expensive
> >> > > > >>> > > and
> >> > > > >>> > > > > making decisions based on the order of elements can be
> >> > > tricky.
> >> > > > An
> >> > > > >>> > > extreme
> >> > > > >>> > > > > example of why this can be problematic is the case
> where
> >> > all
> >> > > > >>> elements
> >> > > > >>> > > in
> >> > > > >>> > > > > the window have the same timestamp. Now, if you decide
> >> to
> >> > > evict
> >> > > > >>> the
> >> > > > >>> > > > first 5
> >> > > > >>> > > > > elements based on timestamp order you basically
> >> arbitrarily
> >> > > > >>> evict 5
> >> > > > >>> > > > > elements. I think the better solution for doing
> >> time-based
> >> > > > >>> eviction
> >> > > > >>> > is
> >> > > > >>> > > to
> >> > > > >>> > > > > do one pass over the elements to get an overview of
> the
> >> > > > timestamp
> >> > > > >>> > > > > distribution, then do a second pass and evict elements
> >> > based
> >> > > on
> >> > > > >>> what
> >> > > > >>> > > was
> >> > > > >>> > > > > learned in the first pass. This has complexity 2*n
> >> compared
> >> > > to
> >> > > > >>> the
> >> > > > >>> > > n*log
> >> > > > >>> > > > n
> >> > > > >>> > > > > (plus the work of actually deciding what to evict) of
> >> the
> >> > > sort
> >> > > > >>> based
> >> > > > >>> > > > > strategy.
> >> > > > >>> > > > >
> >> > > > >>> > > > > I might be wrong, though, and there could be a valid
> >> > use-case
> >> > > > not
> >> > > > >>> > > covered
> >> > > > >>> > > > > by the above idea.
> >> > > > >>> > > > >
> >> > > > >>> > > > > regarding Vishnu's other use case of evicting based on
> >> some
> >> > > > >>> decision
> >> > > > >>> > in
> >> > > > >>> > > > the
> >> > > > >>> > > > > WindowFunction: could this be solved by doing the
> check
> >> for
> >> > > the
> >> > > > >>> > pattern
> >> > > > >>> > > > in
> >> > > > >>> > > > > the evictor itself instead of in the window function?
> >> I'm
> >> > > very
> >> > > > >>> > hesitant
> >> > > > >>> > > > to
> >> > > > >>> > > > > introduce a coupling between the different components
> of
> >> > the
> >> > > > >>> > windowing
> >> > > > >>> > > > > system, i.e. assigner, trigger, evictor and window
> >> > function.
> >> > > > The
> >> > > > >>> > reason
> >> > > > >>> > > > is
> >> > > > >>> > > > > that using an evictor has a huge performance impact
> >> since
> >> > the
> >> > > > >>> system
> >> > > > >>> > > > always
> >> > > > >>> > > > > has to keep all elements and cannot to incremental
> >> > > aggregation
> >> > > > of
> >> > > > >>> > > window
> >> > > > >>> > > > > results and I therefore don't want to put specific
> >> features
> >> > > > >>> regarding
> >> > > > >>> > > > > eviction into the other components.
> >> > > > >>> > > > >
> >> > > > >>> > > > > Cheers,
> >> > > > >>> > > > > Aljoscha
> >> > > > >>> > > > >
> >> > > > >>> > > > > On Thu, 7 Jul 2016 at 10:00 Radu Tudoran <
> >> > > > >>> radu.tudoran@huawei.com>
> >> > > > >>> > > > wrote:
> >> > > > >>> > > > >
> >> > > > >>> > > > > > Hi,
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > I think the situation Vishnu raised is something
> that
> >> > > should
> >> > > > be
> >> > > > >>> > > > > accounted.
> >> > > > >>> > > > > > It can happen indeed that you want to condition what
> >> you
> >> > > > evict
> >> > > > >>> from
> >> > > > >>> > > > > > the window based on the result of the function to be
> >> > > applied.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > My 2 cents...
> >> > > > >>> > > > > > I would suggest adding a list for the elements of
> the
> >> > > stream
> >> > > > >>> where
> >> > > > >>> > > you
> >> > > > >>> > > > > > can MARK them to be delete. Alternatively the
> iterator
> >> > can
> >> > > be
> >> > > > >>> > > extended
> >> > > > >>> > > > > > to have a function Iterator.markForEviction(int);
> >> These
> >> > can
> >> > > > be
> >> > > > >>> made
> >> > > > >>> > > > > > available also in the apply function. Moreover, we
> can
> >> > use
> >> > > > >>> this to
> >> > > > >>> > > > > > extend the functionality such that you add MARKs
> >> either
> >> > for
> >> > > > >>> > eviction
> >> > > > >>> > > > > > after the function has finished triggering or to be
> >> > evicted
> >> > > > in
> >> > > > >>> the
> >> > > > >>> > > next
> >> > > > >>> > > > > iteration.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > Dr. Radu Tudoran
> >> > > > >>> > > > > > Research Engineer - Big Data Expert
> >> > > > >>> > > > > > IT R&D Division
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> > > > >>> > > > > > European Research Center
> >> > > > >>> > > > > > Riesstrasse 25, 80992 München
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > E-mail: radu.tudoran@huawei.com
> >> > > > >>> > > > > > Mobile: +49 15209084330
> >> > > > >>> > > > > > Telephone: +49 891588344173
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> > > > >>> > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> >> > www.huawei.com
> >> > > > >>> > Registered
> >> > > > >>> > > > > > Office: Düsseldorf, Register Court Düsseldorf, HRB
> >> 56063,
> >> > > > >>> Managing
> >> > > > >>> > > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz
> der
> >> > > > >>> Gesellschaft:
> >> > > > >>> > > > > > Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> >> > > > >>> > > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >> This
> >> > > > >>> e-mail and
> >> > > > >>> > > > > > its attachments contain confidential information
> from
> >> > > HUAWEI,
> >> > > > >>> which
> >> > > > >>> > > is
> >> > > > >>> > > > > > intended only for the person or entity whose address
> >> is
> >> > > > listed
> >> > > > >>> > above.
> >> > > > >>> > > > > > Any use of the information contained herein in any
> way
> >> > > > >>> (including,
> >> > > > >>> > > but
> >> > > > >>> > > > > > not limited to, total or partial disclosure,
> >> > reproduction,
> >> > > or
> >> > > > >>> > > > > > dissemination) by persons other than the intended
> >> > > > recipient(s)
> >> > > > >>> is
> >> > > > >>> > > > > > prohibited. If you receive this e-mail in error,
> >> please
> >> > > > notify
> >> > > > >>> the
> >> > > > >>> > > > > > sender by phone or email immediately and delete it!
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > -----Original Message-----
> >> > > > >>> > > > > > From: Vishnu Viswanath [mailto:
> >> > > vishnu.viswanath25@gmail.com]
> >> > > > >>> > > > > > Sent: Thursday, July 07, 2016 1:28 AM
> >> > > > >>> > > > > > To: Dev
> >> > > > >>> > > > > > Subject: Re: [DISCUSS] Enhance Window Evictor in
> Flink
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > Thank you Maxim and Aljoscha.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > Yes the beforeEvict and afterEvict should able
> address
> >> > > point
> >> > > > 3.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > I have one more use case in my mind (which I might
> >> have
> >> > to
> >> > > do
> >> > > > >>> in
> >> > > > >>> > the
> >> > > > >>> > > > > > later stages of POC).
> >> > > > >>> > > > > > What if the `evictAfter` should behave differently
> >> based
> >> > on
> >> > > > the
> >> > > > >>> > > window
> >> > > > >>> > > > > > function.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > For example.
> >> > > > >>> > > > > > I have a window that got triggered and my evict
> >> function
> >> > is
> >> > > > >>> being
> >> > > > >>> > > > > > called after the apply function. In such cases I
> >> should
> >> > be
> >> > > > >>> able to
> >> > > > >>> > > > > > decide on what I should evict based on the window
> >> > function.
> >> > > > >>> > > > > > e.g.,
> >> > > > >>> > > > > > let the window have elements of type `case class
> >> Item(id:
> >> > > > >>> String,
> >> > > > >>> > > type:
> >> > > > >>> > > > > > String)`  and let the types be `type1` and `type2`.
> >> > > > >>> > > > > > If window function is able to find a sequence :
> `type1
> >> > > type2
> >> > > > >>> > type1`,
> >> > > > >>> > > > > > then evict all elements of the type type2.
> >> > > > >>> > > > > > or if the window function is able to find a sequence
> >> > `type2
> >> > > > >>> type2
> >> > > > >>> > > > > > type1`, then evict all elements of type type1 else
> >> don't
> >> > > > evict
> >> > > > >>> any
> >> > > > >>> > > > > elements.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > Is this possible? or at least let the window
> function
> >> > > choose
> >> > > > >>> > between
> >> > > > >>> > > > > > two Evictor functions -(one for success case and one
> >> > > failure
> >> > > > >>> case)
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > @Maxim:
> >> > > > >>> > > > > > regarding the sorted window, actually I wanted my
> >> > elements
> >> > > to
> >> > > > >>> be
> >> > > > >>> > > > > > sorted but not for the eviction but while applying
> the
> >> > > window
> >> > > > >>> > > function
> >> > > > >>> > > > > > (so thought this could be done easily). But it would
> >> be
> >> > > good
> >> > > > to
> >> > > > >>> > have
> >> > > > >>> > > > > > the window sorted based on EventTime.
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > Thanks and Regards,
> >> > > > >>> > > > > > Vishnu Viswanath,
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > On Wed, Jul 6, 2016 at 3:55 PM, Maxim <
> >> mfateev@gmail.com
> >> > >
> >> > > > >>> wrote:
> >> > > > >>> > > > > >
> >> > > > >>> > > > > > > Actually for such evictor to be useful the window
> >> > should
> >> > > be
> >> > > > >>> > sorted
> >> > > > >>> > > > > > > by some field, usually event time. What do you
> think
> >> > > about
> >> > > > >>> adding
> >> > > > >>> > > > > > > sorted window abstraction?
> >> > > > >>> > > > > > >
> >> > > > >>> > > > > > > On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek
> >> > > > >>> > > > > > > <aljoscha@apache.org>
> >> > > > >>> > > > > > > wrote:
> >> > > > >>> > > > > > >
> >> > > > >>> > > > > > > > @Maxim: That's perfect I didn't think about
> using
> >> > > > >>> > > > > > > > Iterator.remove() for that. I'll update the doc.
> >> What
> >> > > do
> >> > > > >>> you
> >> > > > >>> > > think
> >> > > > >>> > > > > > > > Vishnu? This should also
> >> > > > >>> > > > > > > cover
> >> > > > >>> > > > > > > > your before/after case nicely.
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > > > @Vishnu: The steps would be these:
> >> > > > >>> > > > > > > >  - Converge on a design in this discussion
> >> > > > >>> > > > > > > >  - Add a Jira issue here:
> >> > > > >>> > > > > > > > https://issues.apache.org/jira/browse/FLINK
> >> > > > >>> > > > > > > >  - Work on the code an create a pull request on
> >> > github
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > > > The steps are also outlined here
> >> > > > >>> > > > > > > > http://flink.apache.org/how-to-contribute.html
> >> and
> >> > > here
> >> > > > >>> > > > > > > > http://flink.apache.org/contribute-code.html.
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > > > -
> >> > > > >>> > > > > > > > Aljoscha
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > > > On Wed, 6 Jul 2016 at 19:45 Maxim <
> >> mfateev@gmail.com
> >> > >
> >> > > > >>> wrote:
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > > > > The new API forces iteration through every
> >> element
> >> > of
> >> > > > the
> >> > > > >>> > > buffer
> >> > > > >>> > > > > > > > > even
> >> > > > >>> > > > > > > if
> >> > > > >>> > > > > > > > a
> >> > > > >>> > > > > > > > > single value to be evicted. What about
> >> implementing
> >> > > > >>> > > > > > > > > Iterator.remove() method for elements? The API
> >> > would
> >> > > > look
> >> > > > >>> > like:
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > public interface Evictor<T, W extends Window>
> >> > extends
> >> > > > >>> > > > > > > > > Serializable {
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > >    /**
> >> > > > >>> > > > > > > > >     *  Optionally evicts elements. Called
> before
> >> > > > >>> windowing
> >> > > > >>> > > > > function.
> >> > > > >>> > > > > > > > >     *
> >> > > > >>> > > > > > > > >     * @param elements The elements currently
> in
> >> the
> >> > > > >>> pane. Use
> >> > > > >>> > > > > > > > > Iterator.remove to evict.
> >> > > > >>> > > > > > > > >     * @param size The current number of
> >> elements in
> >> > > the
> >> > > > >>> pane.
> >> > > > >>> > > > > > > > >     * @param window The {@link Window}
> >> > > > >>> > > > > > > > >     */
> >> > > > >>> > > > > > > > >    void evictBefore(Iterable<T> elements, int
> >> size,
> >> > > > >>> > > > > > > > > EvictorContext
> >> > > > >>> > > > > > > ctx);
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > >    /**
> >> > > > >>> > > > > > > > >     *  Optionally evicts elements. Called
> after
> >> > > > windowing
> >> > > > >>> > > > function.
> >> > > > >>> > > > > > > > >     *
> >> > > > >>> > > > > > > > >     * @param elements The elements currently
> in
> >> the
> >> > > > >>> pane. Use
> >> > > > >>> > > > > > > > > Iterator.remove to evict.
> >> > > > >>> > > > > > > > >     * @param size The current number of
> >> elements in
> >> > > the
> >> > > > >>> pane.
> >> > > > >>> > > > > > > > >     * @param window The {@link Window}
> >> > > > >>> > > > > > > > >     */
> >> > > > >>> > > > > > > > >    void evictAfter(Iterable<T> elements, int
> >> size,
> >> > > > >>> > > > > > > > > EvictorContext ctx); }
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > Such API allows to abort iteration at any
> point
> >> and
> >> > > > evict
> >> > > > >>> > > > > > > > > elements in
> >> > > > >>> > > > > > > any
> >> > > > >>> > > > > > > > > order.
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > Thanks,
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > Maxim.
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu
> >> Viswanath <
> >> > > > >>> > > > > > > > > vishnu.viswanath25@gmail.com> wrote:
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > Hi Aljoscha,
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > Thanks. Yes the new interface seems to
> address
> >> > > points
> >> > > > >>> 1 and
> >> > > > >>> > > 2.
> >> > > > >>> > > > > > > > > > of
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > *1) I am having a use case where I have to
> >> > create a
> >> > > > >>> custom
> >> > > > >>> > > > > > > > > > Evictor
> >> > > > >>> > > > > > > that
> >> > > > >>> > > > > > > > > > will evict elements from the window based on
> >> the
> >> > > > value
> >> > > > >>> > (e.g.,
> >> > > > >>> > > > > > > > > > if I
> >> > > > >>> > > > > > > have
> >> > > > >>> > > > > > > > > > elements are of case class Item(id: Int,
> >> > > type:String)
> >> > > > >>> then
> >> > > > >>> > > > > > > > > > evict
> >> > > > >>> > > > > > > > elements
> >> > > > >>> > > > > > > > > > that has type="a"). I believe this is not
> >> > currently
> >> > > > >>> > > possible.*
> >> > > > >>> > > > > > > > > > *2) this is somewhat related to 1) where
> there
> >> > > should
> >> > > > >>> be an
> >> > > > >>> > > > > > > > > > option to
> >> > > > >>> > > > > > > > > evict
> >> > > > >>> > > > > > > > > > elements from anywhere in the window. not
> only
> >> > from
> >> > > > the
> >> > > > >>> > > > > > > > > > beginning of
> >> > > > >>> > > > > > > > the
> >> > > > >>> > > > > > > > > > window. (e.g., apply the delta function to
> all
> >> > > > >>> elements and
> >> > > > >>> > > > > > > > > > remove
> >> > > > >>> > > > > > > all
> >> > > > >>> > > > > > > > > > those don't pass. I checked the code and
> evict
> >> > > method
> >> > > > >>> just
> >> > > > >>> > > > > > > > > > returns
> >> > > > >>> > > > > > > the
> >> > > > >>> > > > > > > > > > number of elements to be removed and
> >> > > > >>> processTriggerResult
> >> > > > >>> > > just
> >> > > > >>> > > > > > > > > > skips
> >> > > > >>> > > > > > > > > those
> >> > > > >>> > > > > > > > > > many elements from the beginning.  *
> >> > > > >>> > > > > > > > > > *3) Add an option to enables the user to
> >> decide
> >> > if
> >> > > > the
> >> > > > >>> > > > > > > > > > eviction
> >> > > > >>> > > > > > > should
> >> > > > >>> > > > > > > > > > happen before the apply function or after
> the
> >> > apply
> >> > > > >>> > function.
> >> > > > >>> > > > > > > Currently
> >> > > > >>> > > > > > > > > it
> >> > > > >>> > > > > > > > > > is before the apply function, but I have a
> use
> >> > case
> >> > > > >>> where I
> >> > > > >>> > > > > > > > > > need to
> >> > > > >>> > > > > > > > first
> >> > > > >>> > > > > > > > > > apply the function and evict afterward.*
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > I would be interested in contributing to the
> >> code
> >> > > > base.
> >> > > > >>> > > Please
> >> > > > >>> > > > > > > > > > let me
> >> > > > >>> > > > > > > > > know
> >> > > > >>> > > > > > > > > > the steps.
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > Thanks and Regards,
> >> > > > >>> > > > > > > > > > Vishnu Viswanath
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha
> >> > Krettek <
> >> > > > >>> > > > > > > aljoscha@apache.org
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > > > wrote:
> >> > > > >>> > > > > > > > > >
> >> > > > >>> > > > > > > > > > > Hi,
> >> > > > >>> > > > > > > > > > > as mentioned in the thread on improving
> the
> >> > > > Windowing
> >> > > > >>> > API I
> >> > > > >>> > > > > > > > > > > also
> >> > > > >>> > > > > > > > have a
> >> > > > >>> > > > > > > > > > > design doc just for improving
> >> WindowEvictors. I
> >> > > had
> >> > > > >>> this
> >> > > > >>> > in
> >> > > > >>> > > > > > > > > > > my head
> >> > > > >>> > > > > > > > for
> >> > > > >>> > > > > > > > > a
> >> > > > >>> > > > > > > > > > > while but was hesitant to publish but
> since
> >> > > people
> >> > > > >>> are
> >> > > > >>> > > > > > > > > > > asking about
> >> > > > >>> > > > > > > > > this
> >> > > > >>> > > > > > > > > > > now might be a good time to post it.
> Here's
> >> the
> >> > > > doc:
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > >
> >> > > > >>> > >
> >> > > >
> >> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAj
> >> > > > >>> > > > > > > m5
> >> > > > >>> > > > > > > i9E4A_JlU/edit?usp=sharing
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > > > > Feedback/Suggestions are very welcome!
> >> Please
> >> > let
> >> > > > me
> >> > > > >>> know
> >> > > > >>> > > > > > > > > > > what you
> >> > > > >>> > > > > > > > > think.
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > > > > @Vishnu: Are you interested in
> contributing
> >> a
> >> > > > >>> solution
> >> > > > >>> > for
> >> > > > >>> > > > > > > > > > > this to
> >> > > > >>> > > > > > > > the
> >> > > > >>> > > > > > > > > > > Flink code base? I'd be very happy to work
> >> with
> >> > > you
> >> > > > >>> on
> >> > > > >>> > > this.
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > > > > Cheers,
> >> > > > >>> > > > > > > > > > > Aljoscha
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > > > > P.S. I think it would be best to keep
> >> > discussions
> >> > > > to
> >> > > > >>> the
> >> > > > >>> > ML
> >> > > > >>> > > > > > > > > > > because comments on the doc will not be
> >> visible
> >> > > > here
> >> > > > >>> for
> >> > > > >>> > > > > > everyone.
> >> > > > >>> > > > > > > > > > >
> >> > > > >>> > > > > > > > >
> >> > > > >>> > > > > > > >
> >> > > > >>> > > > > > >
> >> > > > >>> > > > > >
> >> > > > >>> > > > >
> >> > > > >>> > > >
> >> > > > >>> > >
> >> > > > >>> > ​
> >> > > > >>> >
> >> > > > >>>
> >> > > > >>
> >> > > > >>
> >> > > > >>
> >> > > > >>
> >> > > > >>
> >> > > > >
> >> > > > > ​
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message