flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Enhance Window Evictor in Flink
Date Fri, 08 Jul 2016 10:04:14 GMT
Hi,
come to think of it, the right place to put such checks is actually the
Trigger. It would have to be a custom trigger that observes time but also
keeps some internal state machine to decide when it has observed the right
pattern in the window. Then the window function would just have to do the
processing and you have good separation of concerns. Does that make sense?

I'm ignoring time and sorting by time for now because we probably need
another design document for that. To me it seems like a bigger thing.

Cheers,
Aljoscha

On Thu, 7 Jul 2016 at 23:56 Vishnu Viswanath <vishnu.viswanath25@gmail.com>
wrote:

> Hi,
>
> Regarding the evictAfter function, that evicts based on some decision made
> by the window function:  I think it will be nice if we can come up with
> something that is LESS coupled, because I can think of several use cases
> that depend on it.
>
> Especially in the case where there are late arriving messages. Only after
> the window function is applied we could tell what to do with the elements
> in the window. You could apply your business logic there to determine if
> the window funciton was able to do what it is supposed to do, if yes evict
> those elements, else(since the elements you are looking for haven't arrived
> yet) wait and try again when the trigger gets fired next time.
>
> Thanks and Regards,
> Vishnu Viswanath,
>
>
> On Thu, Jul 7, 2016 at 9:19 AM, Radu Tudoran <radu.tudoran@huawei.com>
> wrote:
>
> > Hi,
> >
> > @Aljoscha - I can understand the reason why you are hesitant to introduce
> > "slower" windows such as the ones that would maintain sorted items or
> > windows with bindings between the different entities (evictor, trigger,
> > window, apply function). However, I think it's possible just to create
> more
> > types of windows. The existing ones (timewindows, global windows ...) can
> > remain, and just add some more flavors of windows were more features are
> > enabled or more functionality (e.g., access to the each element in the
> > evictor ; possibility to delete or mark for eviction elements in the
> > function...)
> >
> > Regarding the specific case of sorted windows, I think the N lon N
> > complexity to sort (the worst case) is very unlikely. In fact you have
> > almost sorted items/arrays. Moreover, if you consider that in iteration X
> > all elements were sorted, then in iteration X+1 you will need to sort
> just
> > the newly arrived elements (M). I would expect that this number M might
> be
> > significant smaller then N (elements that exists). Then using an
> insertion
> > sort for these new elements you would have M  * N complexity and if M<< N
> > then the complexity is O(N). Alternatively you can use a binary search
> for
> > insertion and then you further reduce the complexity to O(logN).
> > If M is proportional to N then you can sort M and use merge sort for
> > combining.
> >
> >
> > Dr. Radu Tudoran
> > Research Engineer - Big Data Expert
> > IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudoran@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: 吕文龙(吕文龙) [mailto:wenlong.lwl@alibaba-inc.com]
> > Sent: Thursday, July 07, 2016 11:59 AM
> > To: dev@flink.apache.org
> > Subject: 答复: [DISCUSS] Enhance Window Evictor in Flink
> >
> > HI,
> > I think it is necessary to support sorted window, which can avoid
> scanning
> > all the elements of window while trying to evicting element, which may
> cost
> > many IO operations, such as querying DBs to get elements from state.
> > What's more, when an window aggregation function is invertible, such as
> > sum, which can be updated by adding or removing a single record, window
> > results can be incrementally calculated. In this kind of case, we can
> > dramatically improve the performance of window aggregation, if evictor
> can
> > trigger update of window aggregation state by some mechanism.
> >
> > Best Wishes!
> > ---
> > wenlong.lwl
> >
> > -----邮件原件-----
> > 发件人: Aljoscha Krettek [mailto:aljoscha@apache.org]
> > 发送时间: 2016年7月7日 17:32
> > 收件人: dev@flink.apache.org
> > 主题: Re: [DISCUSS] Enhance Window Evictor in Flink
> >
> > Hi,
> > regarding "sorting the window by event time": I also considered this but
> > in the end I don't think it's necessary. Sorting is rather expensive and
> > making decisions based on the order of elements can be tricky. An extreme
> > example of why this can be problematic is the case where all elements in
> > the window have the same timestamp. Now, if you decide to evict the
> first 5
> > elements based on timestamp order you basically arbitrarily evict 5
> > elements. I think the better solution for doing time-based eviction is to
> > do one pass over the elements to get an overview of the timestamp
> > distribution, then do a second pass and evict elements based on what was
> > learned in the first pass. This has complexity 2*n compared to the n*log
> n
> > (plus the work of actually deciding what to evict) of the sort based
> > strategy.
> >
> > I might be wrong, though, and there could be a valid use-case not covered
> > by the above idea.
> >
> > regarding Vishnu's other use case of evicting based on some decision in
> the
> > WindowFunction: could this be solved by doing the check for the pattern
> in
> > the evictor itself instead of in the window function? I'm very hesitant
> to
> > introduce a coupling between the different components of the windowing
> > system, i.e. assigner, trigger, evictor and window function. The reason
> is
> > that using an evictor has a huge performance impact since the system
> always
> > has to keep all elements and cannot to incremental aggregation of window
> > results and I therefore don't want to put specific features regarding
> > eviction into the other components.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 7 Jul 2016 at 10:00 Radu Tudoran <radu.tudoran@huawei.com>
> wrote:
> >
> > > Hi,
> > >
> > > I think the situation Vishnu raised is something that should be
> > accounted.
> > > It can happen indeed that you want to condition what you evict from
> > > the window based on the result of the function to be applied.
> > >
> > > My 2 cents...
> > > I would suggest adding a list for the elements of the stream where you
> > > can MARK them to be delete. Alternatively the iterator can be extended
> > > to have a function Iterator.markForEviction(int); These can be made
> > > available also in the apply function. Moreover, we can use this to
> > > extend the functionality such that you add MARKs either for eviction
> > > after the function has finished triggering or to be evicted in the next
> > iteration.
> > >
> > >
> > > Dr. Radu Tudoran
> > > Research Engineer - Big Data Expert
> > > IT R&D Division
> > >
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > European Research Center
> > > Riesstrasse 25, 80992 München
> > >
> > > E-mail: radu.tudoran@huawei.com
> > > Mobile: +49 15209084330
> > > Telephone: +49 891588344173
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
> > > Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft:
> > > Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
> > > its attachments contain confidential information from HUAWEI, which is
> > > intended only for the person or entity whose address is listed above.
> > > Any use of the information contained herein in any way (including, but
> > > not limited to, total or partial disclosure, reproduction, or
> > > dissemination) by persons other than the intended recipient(s) is
> > > prohibited. If you receive this e-mail in error, please notify the
> > > sender by phone or email immediately and delete it!
> > >
> > > -----Original Message-----
> > > From: Vishnu Viswanath [mailto:vishnu.viswanath25@gmail.com]
> > > Sent: Thursday, July 07, 2016 1:28 AM
> > > To: Dev
> > > Subject: Re: [DISCUSS] Enhance Window Evictor in Flink
> > >
> > > Thank you Maxim and Aljoscha.
> > >
> > > Yes the beforeEvict and afterEvict should able address point 3.
> > >
> > > I have one more use case in my mind (which I might have to do in the
> > > later stages of POC).
> > > What if the `evictAfter` should behave differently based on the window
> > > function.
> > >
> > > For example.
> > > I have a window that got triggered and my evict function is being
> > > called after the apply function. In such cases I should be able to
> > > decide on what I should evict based on the window function.
> > > e.g.,
> > > let the window have elements of type `case class Item(id: String, type:
> > > String)`  and let the types be `type1` and `type2`.
> > > If window function is able to find a sequence : `type1 type2 type1`,
> > > then evict all elements of the type type2.
> > > or if the window function is able to find a sequence `type2 type2
> > > type1`, then evict all elements of type type1 else don't evict any
> > elements.
> > >
> > > Is this possible? or at least let the window function choose between
> > > two Evictor functions -(one for success case and one failure case)
> > >
> > > @Maxim:
> > > regarding the sorted window, actually I wanted my elements to be
> > > sorted but not for the eviction but while applying the window function
> > > (so thought this could be done easily). But it would be good to have
> > > the window sorted based on EventTime.
> > >
> > >
> > > Thanks and Regards,
> > > Vishnu Viswanath,
> > >
> > >
> > >
> > >
> > > On Wed, Jul 6, 2016 at 3:55 PM, Maxim <mfateev@gmail.com> wrote:
> > >
> > > > Actually for such evictor to be useful the window should be sorted
> > > > by some field, usually event time. What do you think about adding
> > > > sorted window abstraction?
> > > >
> > > > On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek
> > > > <aljoscha@apache.org>
> > > > wrote:
> > > >
> > > > > @Maxim: That's perfect I didn't think about using
> > > > > Iterator.remove() for that. I'll update the doc. What do you think
> > > > > Vishnu? This should also
> > > > cover
> > > > > your before/after case nicely.
> > > > >
> > > > > @Vishnu: The steps would be these:
> > > > >  - Converge on a design in this discussion
> > > > >  - Add a Jira issue here:
> > > > > https://issues.apache.org/jira/browse/FLINK
> > > > >  - Work on the code an create a pull request on github
> > > > >
> > > > > The steps are also outlined here
> > > > > http://flink.apache.org/how-to-contribute.html and here
> > > > > http://flink.apache.org/contribute-code.html.
> > > > >
> > > > > -
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 6 Jul 2016 at 19:45 Maxim <mfateev@gmail.com> wrote:
> > > > >
> > > > > > The new API forces iteration through every element of the buffer
> > > > > > even
> > > > if
> > > > > a
> > > > > > single value to be evicted. What about implementing
> > > > > > Iterator.remove() method for elements? The API would look like:
> > > > > >
> > > > > > public interface Evictor<T, W extends Window> extends
> > > > > > Serializable {
> > > > > >
> > > > > >    /**
> > > > > >     *  Optionally evicts elements. Called before windowing
> > function.
> > > > > >     *
> > > > > >     * @param elements The elements currently in the pane. Use
> > > > > > Iterator.remove to evict.
> > > > > >     * @param size The current number of elements in the pane.
> > > > > >     * @param window The {@link Window}
> > > > > >     */
> > > > > >    void evictBefore(Iterable<T> elements, int size,
> > > > > > EvictorContext
> > > > ctx);
> > > > > >
> > > > > >    /**
> > > > > >     *  Optionally evicts elements. Called after windowing
> function.
> > > > > >     *
> > > > > >     * @param elements The elements currently in the pane. Use
> > > > > > Iterator.remove to evict.
> > > > > >     * @param size The current number of elements in the pane.
> > > > > >     * @param window The {@link Window}
> > > > > >     */
> > > > > >    void evictAfter(Iterable<T> elements, int size,
> > > > > > EvictorContext ctx); }
> > > > > >
> > > > > > Such API allows to abort iteration at any point and evict
> > > > > > elements in
> > > > any
> > > > > > order.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Maxim.
> > > > > >
> > > > > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath <
> > > > > > vishnu.viswanath25@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > >
> > > > > > > Thanks. Yes the new interface seems to address points 1
and 2.
> > > > > > > of
> > > > > > >
> > > > > > > *1) I am having a use case where I have to create a custom
> > > > > > > Evictor
> > > > that
> > > > > > > will evict elements from the window based on the value
(e.g.,
> > > > > > > if I
> > > > have
> > > > > > > elements are of case class Item(id: Int, type:String) then
> > > > > > > evict
> > > > > elements
> > > > > > > that has type="a"). I believe this is not currently possible.*
> > > > > > > *2) this is somewhat related to 1) where there should be
an
> > > > > > > option to
> > > > > > evict
> > > > > > > elements from anywhere in the window. not only from the
> > > > > > > beginning of
> > > > > the
> > > > > > > window. (e.g., apply the delta function to all elements
and
> > > > > > > remove
> > > > all
> > > > > > > those don't pass. I checked the code and evict method just
> > > > > > > returns
> > > > the
> > > > > > > number of elements to be removed and processTriggerResult
just
> > > > > > > skips
> > > > > > those
> > > > > > > many elements from the beginning.  *
> > > > > > > *3) Add an option to enables the user to decide if the
> > > > > > > eviction
> > > > should
> > > > > > > happen before the apply function or after the apply function.
> > > > Currently
> > > > > > it
> > > > > > > is before the apply function, but I have a use case where
I
> > > > > > > need to
> > > > > first
> > > > > > > apply the function and evict afterward.*
> > > > > > >
> > > > > > > I would be interested in contributing to the code base.
Please
> > > > > > > let me
> > > > > > know
> > > > > > > the steps.
> > > > > > >
> > > > > > > Thanks and Regards,
> > > > > > > Vishnu Viswanath
> > > > > > >
> > > > > > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek <
> > > > aljoscha@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > as mentioned in the thread on improving the Windowing
API I
> > > > > > > > also
> > > > > have a
> > > > > > > > design doc just for improving WindowEvictors. I had
this in
> > > > > > > > my head
> > > > > for
> > > > > > a
> > > > > > > > while but was hesitant to publish but since people
are
> > > > > > > > asking about
> > > > > > this
> > > > > > > > now might be a good time to post it. Here's the doc:
> > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAj
> > > > m5
> > > > i9E4A_JlU/edit?usp=sharing
> > > > > > > >
> > > > > > > > Feedback/Suggestions are very welcome! Please let
me know
> > > > > > > > what you
> > > > > > think.
> > > > > > > >
> > > > > > > > @Vishnu: Are you interested in contributing a solution
for
> > > > > > > > this to
> > > > > the
> > > > > > > > Flink code base? I'd be very happy to work with you
on this.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > P.S. I think it would be best to keep discussions
to the ML
> > > > > > > > because comments on the doc will not be visible here
for
> > > everyone.
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message