flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Enhance Window Evictor in Flink
Date Wed, 13 Jul 2016 14:09:56 GMT
Hi,
I think the way to go here is to add both an EventTimeEvictor and a
ProcessingTimeEvictor. The problem is that "isEventTime" cannot really be
determined. That's also the reason why there is an EventTimeTrigger and a
ProcessingTimeTrigger. It was just an oversight that the TimeEvictor does
not also have these two versions.

About EvictingWindowOperator, I think you can make the two methods
non-final in WindowOperator, yes.

Cheers,
Aljoscha

On Wed, 13 Jul 2016 at 14:32 Vishnu Viswanath <vishnu.viswanath25@gmail.com>
wrote:

> Hi Aljoscha,
>
> I am thinking of adding a method boolean isEventTime(); in the
> EvictorContext apart from
>
> long getCurrentProcessingTime();
> MetricGroup getMetricGroup();
> long getCurrentWatermark();
>
> This method can be used to make the Evictor not iterate through all the
> elements in TimeEvictor. There will be a few changes in the existing
> behavior of TimeEvictor and DeltaEvictor (I have mentioned this in the
> design doc)
>
> Also, is there any specific reason why the open and close method in
> WindowEvictor is made final? Since the EvictorContext will be in the
> EvictingWindowOperator, I need to override the open and close in
> EvitingWindowOperator to make the reference of EvictorContext null.
>
> Thanks and Regards,
> Vishnu Viswanath,
>
> On Fri, Jul 8, 2016 at 7:40 PM, Vishnu Viswanath <
> vishnu.viswanath25@gmail.com> wrote:
>
> My thought process when asking if we can use state backend in window
> > function was : can we add the elements to be evicted into some state and
> > allow the evictAfter to read it from some context and remove it from the
> > window?
> >
> >
> > On Fri, Jul 8, 2016 at 7:30 PM, Vishnu Viswanath <
> > vishnu.viswanath25@gmail.com> wrote:
> >
> >> Hi Aljoscha,
> >>
> >> Thanks for the explanation, and sorry for late reply was busy with work.
> >>
> >> I did think about this scenario, in fact in my previous mail I thought
> of
> >> posting this question, then I understood that this problem will be
> >> there which ever method we choose(Trigger looking for pattern or Window
> >> looking for pattern).
> >>
> >> I do have a pretty good watermark but my concern is that it changes
> based
> >> on the key of these messages(I don't know if it is possible, haven't
> >> started coding that yet. May be you could tell me). Even if it is yes
> some
> >> of these watermarks will be long(in days), which I don't want the
> trigger
> >> to wait that long.
> >>
> >> It looks like it is not easy to have an evictAfter based on window
> >> function(without introducing coupling), but can the current window apply
> >> function be modified to allow it to change the elements in it - may be
> >> using some state backend(I don't know how excatly the internals of these
> >> work, so this might be a wrong question)
> >>
> >> Thanks and Regards,
> >> Vishnu Viswanath,
> >>
> >> On Fri, Jul 8, 2016 at 10:20 AM, Aljoscha Krettek <aljoscha@apache.org>
> >> wrote:
> >>
> >>> Hi Vishnu,
> >>> how long would these patterns be? The Trigger would not have to sort
> the
> >>> elements for every new element but just insert the new element into an
> >>> internal data structure. Only when it sees that the watermark is past a
> >>> certain point would it check whether the pattern matches and actually
> >>> Trigger.
> >>>
> >>> A general note regarding order and event time: I think relying on this
> >>> for
> >>> computation is very tricky unless the watermark is 100 % correct or you
> >>> completely discard elements that arrive after the watermark, i.e.
> >>> elements
> >>> that would break the promise of the watermark that no elements with an
> >>> earlier timestamp will ever arrive. The reason for this is that there
> >>> could
> >>> always enter new elements that end up between already seen elements.
> For
> >>> example, let's say we have this sequence of elements when the trigger
> >>> fires:
> >>>
> >>> a-b-a
> >>>
> >>> This is the sequence that you are looking for and you emit some result
> >>> from
> >>> the WindowFunction. Now, new elements arrive that fall in between the
> >>> elements we already have:
> >>>
> >>> a-d-e-b-f-g-a
> >>>
> >>> This is an updated, sorted view of the actual event-time stream and we
> >>> didn't realize that the stream actually looks like this before. Does
> this
> >>> still match the original pattern or should we now consider this as
> >>> non-matching? If no, then the earlier successful match for a-b-a was
> >>> wrong
> >>> and we should never have processed it but we didn't know at the time.
> If
> >>> yes, then pattern matching like this can be done in the Trigger by
> having
> >>> something like pattern slots: You don't have to store all elements in
> the
> >>> Trigger, you just need to store possible candidates that could match
> the
> >>> pattern and ignore the other (in-between) elements.
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Fri, 8 Jul 2016 at 14:10 Vishnu Viswanath <
> >>> vishnu.viswanath25@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Aljoscha,
> >>> >
> >>> > That is a good idea, trying to tie it back to the use case,
> >>> > e.g., suppose trigger is looking for a pattern, a-b-a and when it
> sees
> >>> such
> >>> > a pattern, it will trigger the window and it knows that now the
> >>> Evictor is
> >>> > going to evict the element b, and trigger updates its state as a-a
> >>> (even
> >>> > before the window & evictor completes) and will be looking for
the
> >>> rest of
> >>> > the pattern i.e., b-a. But I can think of 1 problem here,
> >>> >
> >>> >    - the events can arrive out of order, i.e., the trigger might be
> >>> seeing
> >>> >    a pattern a-a-b but actual event time is a-b-a then trigger will
> >>> have to
> >>> >    sort the elements in the window everytime it sees an element. (I
> was
> >>> >    planning to do this sorting in the window, which will be less
> often
> >>> -
> >>> > only
> >>> >    when the trigger fires)
> >>> >
> >>> > Thanks and Regards,
> >>> > Vishnu Viswanath,
> >>> >
> >>> > On Fri, Jul 8, 2016 at 6:04 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> >>> > wrote:
> >>> >
> >>> > Hi,
> >>> > > come to think of it, the right place to put such checks is actually
> >>> the
> >>> > > Trigger. It would have to be a custom trigger that observes time
> but
> >>> also
> >>> > > keeps some internal state machine to decide when it has observed
> the
> >>> > right
> >>> > > pattern in the window. Then the window function would just have
to
> >>> do the
> >>> > > processing and you have good separation of concerns. Does that
make
> >>> > sense?
> >>> > >
> >>> > > I'm ignoring time and sorting by time for now because we probably
> >>> need
> >>> > > another design document for that. To me it seems like a bigger
> thing.
> >>> > >
> >>> > > Cheers,
> >>> > > Aljoscha
> >>> > >
> >>> > > On Thu, 7 Jul 2016 at 23:56 Vishnu Viswanath <
> >>> > vishnu.viswanath25@gmail.com
> >>> > > >
> >>> > > wrote:
> >>> > >
> >>> > > > Hi,
> >>> > > >
> >>> > > > Regarding the evictAfter function, that evicts based on some
> >>> decision
> >>> > > made
> >>> > > > by the window function:  I think it will be nice if we can
come
> up
> >>> with
> >>> > > > something that is LESS coupled, because I can think of several
> use
> >>> > cases
> >>> > > > that depend on it.
> >>> > > >
> >>> > > > Especially in the case where there are late arriving messages.
> Only
> >>> > after
> >>> > > > the window function is applied we could tell what to do with
the
> >>> > elements
> >>> > > > in the window. You could apply your business logic there
to
> >>> determine
> >>> > if
> >>> > > > the window funciton was able to do what it is supposed to
do, if
> >>> yes
> >>> > > evict
> >>> > > > those elements, else(since the elements you are looking for
> haven't
> >>> > > arrived
> >>> > > > yet) wait and try again when the trigger gets fired next
time.
> >>> > > >
> >>> > > > Thanks and Regards,
> >>> > > > Vishnu Viswanath,
> >>> > > >
> >>> > > >
> >>> > > > On Thu, Jul 7, 2016 at 9:19 AM, Radu Tudoran <
> >>> radu.tudoran@huawei.com>
> >>> > > > wrote:
> >>> > > >
> >>> > > > > Hi,
> >>> > > > >
> >>> > > > > @Aljoscha - I can understand the reason why you are
hesitant to
> >>> > > introduce
> >>> > > > > "slower" windows such as the ones that would maintain
sorted
> >>> items or
> >>> > > > > windows with bindings between the different entities
(evictor,
> >>> > trigger,
> >>> > > > > window, apply function). However, I think it's possible
just to
> >>> > create
> >>> > > > more
> >>> > > > > types of windows. The existing ones (timewindows, global
> windows
> >>> ...)
> >>> > > can
> >>> > > > > remain, and just add some more flavors of windows were
more
> >>> features
> >>> > > are
> >>> > > > > enabled or more functionality (e.g., access to the each
element
> >>> in
> >>> > the
> >>> > > > > evictor ; possibility to delete or mark for eviction
elements
> in
> >>> the
> >>> > > > > function...)
> >>> > > > >
> >>> > > > > Regarding the specific case of sorted windows, I think
the N
> lon
> >>> N
> >>> > > > > complexity to sort (the worst case) is very unlikely.
In fact
> you
> >>> > have
> >>> > > > > almost sorted items/arrays. Moreover, if you consider
that in
> >>> > > iteration X
> >>> > > > > all elements were sorted, then in iteration X+1 you
will need
> to
> >>> sort
> >>> > > > just
> >>> > > > > the newly arrived elements (M). I would expect that
this
> number M
> >>> > might
> >>> > > > be
> >>> > > > > significant smaller then N (elements that exists). Then
using
> an
> >>> > > > insertion
> >>> > > > > sort for these new elements you would have M  * N complexity
> and
> >>> if
> >>> > > M<< N
> >>> > > > > then the complexity is O(N). Alternatively you can use
a binary
> >>> > search
> >>> > > > for
> >>> > > > > insertion and then you further reduce the complexity
to
> O(logN).
> >>> > > > > If M is proportional to N then you can sort M and use
merge
> sort
> >>> for
> >>> > > > > combining.
> >>> > > > >
> >>> > > > >
> >>> > > > > Dr. Radu Tudoran
> >>> > > > > Research Engineer - Big Data Expert
> >>> > > > > IT R&D Division
> >>> > > > >
> >>> > > > >
> >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> > > > > European Research Center
> >>> > > > > Riesstrasse 25, 80992 München
> >>> > > > >
> >>> > > > > E-mail: radu.tudoran@huawei.com
> >>> > > > > Mobile: +49 15209084330
> >>> > > > > Telephone: +49 891588344173
> >>> > > > >
> >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>> > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
HRB
> >>> 56063,
> >>> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
HRB
> >>> 56063,
> >>> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>> > > > > This e-mail and its attachments contain confidential
> information
> >>> from
> >>> > > > > HUAWEI, which is intended only for the person or entity
whose
> >>> address
> >>> > > is
> >>> > > > > listed above. Any use of the information contained herein
in
> any
> >>> way
> >>> > > > > (including, but not limited to, total or partial disclosure,
> >>> > > > reproduction,
> >>> > > > > or dissemination) by persons other than the intended
> >>> recipient(s) is
> >>> > > > > prohibited. If you receive this e-mail in error, please
notify
> >>> the
> >>> > > sender
> >>> > > > > by phone or email immediately and delete it!
> >>> > > > >
> >>> > > > >
> >>> > > > > -----Original Message-----
> >>> > > > > From: 吕文龙(吕文龙) [mailto:wenlong.lwl@alibaba-inc.com]
> >>> > > > > Sent: Thursday, July 07, 2016 11:59 AM
> >>> > > > > To: dev@flink.apache.org
> >>> > > > > Subject: 答复: [DISCUSS] Enhance Window Evictor in
Flink
> >>> > > > >
> >>> > > > > HI,
> >>> > > > > I think it is necessary to support sorted window, which
can
> avoid
> >>> > > > scanning
> >>> > > > > all the elements of window while trying to evicting
element,
> >>> which
> >>> > may
> >>> > > > cost
> >>> > > > > many IO operations, such as querying DBs to get elements
from
> >>> state.
> >>> > > > > What's more, when an window aggregation function is
invertible,
> >>> such
> >>> > as
> >>> > > > > sum, which can be updated by adding or removing a single
> record,
> >>> > window
> >>> > > > > results can be incrementally calculated. In this kind
of case,
> >>> we can
> >>> > > > > dramatically improve the performance of window aggregation,
if
> >>> > evictor
> >>> > > > can
> >>> > > > > trigger update of window aggregation state by some mechanism.
> >>> > > > >
> >>> > > > > Best Wishes!
> >>> > > > > ---
> >>> > > > > wenlong.lwl
> >>> > > > >
> >>> > > > > -----邮件原件-----
> >>> > > > > 发件人: Aljoscha Krettek [mailto:aljoscha@apache.org]
> >>> > > > > 发送时间: 2016年7月7日 17:32
> >>> > > > > 收件人: dev@flink.apache.org
> >>> > > > > 主题: Re: [DISCUSS] Enhance Window Evictor in Flink
> >>> > > > >
> >>> > > > > Hi,
> >>> > > > > regarding "sorting the window by event time": I also
considered
> >>> this
> >>> > > but
> >>> > > > > in the end I don't think it's necessary. Sorting is
rather
> >>> expensive
> >>> > > and
> >>> > > > > making decisions based on the order of elements can
be tricky.
> An
> >>> > > extreme
> >>> > > > > example of why this can be problematic is the case where
all
> >>> elements
> >>> > > in
> >>> > > > > the window have the same timestamp. Now, if you decide
to evict
> >>> the
> >>> > > > first 5
> >>> > > > > elements based on timestamp order you basically arbitrarily
> >>> evict 5
> >>> > > > > elements. I think the better solution for doing time-based
> >>> eviction
> >>> > is
> >>> > > to
> >>> > > > > do one pass over the elements to get an overview of
the
> timestamp
> >>> > > > > distribution, then do a second pass and evict elements
based on
> >>> what
> >>> > > was
> >>> > > > > learned in the first pass. This has complexity 2*n compared
to
> >>> the
> >>> > > n*log
> >>> > > > n
> >>> > > > > (plus the work of actually deciding what to evict) of
the sort
> >>> based
> >>> > > > > strategy.
> >>> > > > >
> >>> > > > > I might be wrong, though, and there could be a valid
use-case
> not
> >>> > > covered
> >>> > > > > by the above idea.
> >>> > > > >
> >>> > > > > regarding Vishnu's other use case of evicting based
on some
> >>> decision
> >>> > in
> >>> > > > the
> >>> > > > > WindowFunction: could this be solved by doing the check
for the
> >>> > pattern
> >>> > > > in
> >>> > > > > the evictor itself instead of in the window function?
I'm very
> >>> > hesitant
> >>> > > > to
> >>> > > > > introduce a coupling between the different components
of the
> >>> > windowing
> >>> > > > > system, i.e. assigner, trigger, evictor and window function.
> The
> >>> > reason
> >>> > > > is
> >>> > > > > that using an evictor has a huge performance impact
since the
> >>> system
> >>> > > > always
> >>> > > > > has to keep all elements and cannot to incremental aggregation
> of
> >>> > > window
> >>> > > > > results and I therefore don't want to put specific features
> >>> regarding
> >>> > > > > eviction into the other components.
> >>> > > > >
> >>> > > > > Cheers,
> >>> > > > > Aljoscha
> >>> > > > >
> >>> > > > > On Thu, 7 Jul 2016 at 10:00 Radu Tudoran <
> >>> radu.tudoran@huawei.com>
> >>> > > > wrote:
> >>> > > > >
> >>> > > > > > Hi,
> >>> > > > > >
> >>> > > > > > I think the situation Vishnu raised is something
that should
> be
> >>> > > > > accounted.
> >>> > > > > > It can happen indeed that you want to condition
what you
> evict
> >>> from
> >>> > > > > > the window based on the result of the function
to be applied.
> >>> > > > > >
> >>> > > > > > My 2 cents...
> >>> > > > > > I would suggest adding a list for the elements
of the stream
> >>> where
> >>> > > you
> >>> > > > > > can MARK them to be delete. Alternatively the iterator
can be
> >>> > > extended
> >>> > > > > > to have a function Iterator.markForEviction(int);
These can
> be
> >>> made
> >>> > > > > > available also in the apply function. Moreover,
we can use
> >>> this to
> >>> > > > > > extend the functionality such that you add MARKs
either for
> >>> > eviction
> >>> > > > > > after the function has finished triggering or to
be evicted
> in
> >>> the
> >>> > > next
> >>> > > > > iteration.
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > Dr. Radu Tudoran
> >>> > > > > > Research Engineer - Big Data Expert
> >>> > > > > > IT R&D Division
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> > > > > > European Research Center
> >>> > > > > > Riesstrasse 25, 80992 München
> >>> > > > > >
> >>> > > > > > E-mail: radu.tudoran@huawei.com
> >>> > > > > > Mobile: +49 15209084330
> >>> > > > > > Telephone: +49 891588344173
> >>> > > > > >
> >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>> > Registered
> >>> > > > > > Office: Düsseldorf, Register Court Düsseldorf,
HRB 56063,
> >>> Managing
> >>> > > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz
der
> >>> Gesellschaft:
> >>> > > > > > Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> >>> > > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang
CHEN This
> >>> e-mail and
> >>> > > > > > its attachments contain confidential information
from HUAWEI,
> >>> which
> >>> > > is
> >>> > > > > > intended only for the person or entity whose address
is
> listed
> >>> > above.
> >>> > > > > > Any use of the information contained herein in
any way
> >>> (including,
> >>> > > but
> >>> > > > > > not limited to, total or partial disclosure, reproduction,
or
> >>> > > > > > dissemination) by persons other than the intended
> recipient(s)
> >>> is
> >>> > > > > > prohibited. If you receive this e-mail in error,
please
> notify
> >>> the
> >>> > > > > > sender by phone or email immediately and delete
it!
> >>> > > > > >
> >>> > > > > > -----Original Message-----
> >>> > > > > > From: Vishnu Viswanath [mailto:vishnu.viswanath25@gmail.com]
> >>> > > > > > Sent: Thursday, July 07, 2016 1:28 AM
> >>> > > > > > To: Dev
> >>> > > > > > Subject: Re: [DISCUSS] Enhance Window Evictor in
Flink
> >>> > > > > >
> >>> > > > > > Thank you Maxim and Aljoscha.
> >>> > > > > >
> >>> > > > > > Yes the beforeEvict and afterEvict should able
address point
> 3.
> >>> > > > > >
> >>> > > > > > I have one more use case in my mind (which I might
have to do
> >>> in
> >>> > the
> >>> > > > > > later stages of POC).
> >>> > > > > > What if the `evictAfter` should behave differently
based on
> the
> >>> > > window
> >>> > > > > > function.
> >>> > > > > >
> >>> > > > > > For example.
> >>> > > > > > I have a window that got triggered and my evict
function is
> >>> being
> >>> > > > > > called after the apply function. In such cases
I should be
> >>> able to
> >>> > > > > > decide on what I should evict based on the window
function.
> >>> > > > > > e.g.,
> >>> > > > > > let the window have elements of type `case class
Item(id:
> >>> String,
> >>> > > type:
> >>> > > > > > String)`  and let the types be `type1` and `type2`.
> >>> > > > > > If window function is able to find a sequence :
`type1 type2
> >>> > type1`,
> >>> > > > > > then evict all elements of the type type2.
> >>> > > > > > or if the window function is able to find a sequence
`type2
> >>> type2
> >>> > > > > > type1`, then evict all elements of type type1 else
don't
> evict
> >>> any
> >>> > > > > elements.
> >>> > > > > >
> >>> > > > > > Is this possible? or at least let the window function
choose
> >>> > between
> >>> > > > > > two Evictor functions -(one for success case and
one failure
> >>> case)
> >>> > > > > >
> >>> > > > > > @Maxim:
> >>> > > > > > regarding the sorted window, actually I wanted
my elements to
> >>> be
> >>> > > > > > sorted but not for the eviction but while applying
the window
> >>> > > function
> >>> > > > > > (so thought this could be done easily). But it
would be good
> to
> >>> > have
> >>> > > > > > the window sorted based on EventTime.
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > Thanks and Regards,
> >>> > > > > > Vishnu Viswanath,
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Jul 6, 2016 at 3:55 PM, Maxim <mfateev@gmail.com>
> >>> wrote:
> >>> > > > > >
> >>> > > > > > > Actually for such evictor to be useful the
window should be
> >>> > sorted
> >>> > > > > > > by some field, usually event time. What do
you think about
> >>> adding
> >>> > > > > > > sorted window abstraction?
> >>> > > > > > >
> >>> > > > > > > On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha
Krettek
> >>> > > > > > > <aljoscha@apache.org>
> >>> > > > > > > wrote:
> >>> > > > > > >
> >>> > > > > > > > @Maxim: That's perfect I didn't think
about using
> >>> > > > > > > > Iterator.remove() for that. I'll update
the doc. What do
> >>> you
> >>> > > think
> >>> > > > > > > > Vishnu? This should also
> >>> > > > > > > cover
> >>> > > > > > > > your before/after case nicely.
> >>> > > > > > > >
> >>> > > > > > > > @Vishnu: The steps would be these:
> >>> > > > > > > >  - Converge on a design in this discussion
> >>> > > > > > > >  - Add a Jira issue here:
> >>> > > > > > > > https://issues.apache.org/jira/browse/FLINK
> >>> > > > > > > >  - Work on the code an create a pull
request on github
> >>> > > > > > > >
> >>> > > > > > > > The steps are also outlined here
> >>> > > > > > > > http://flink.apache.org/how-to-contribute.html
and here
> >>> > > > > > > > http://flink.apache.org/contribute-code.html.
> >>> > > > > > > >
> >>> > > > > > > > -
> >>> > > > > > > > Aljoscha
> >>> > > > > > > >
> >>> > > > > > > > On Wed, 6 Jul 2016 at 19:45 Maxim <mfateev@gmail.com>
> >>> wrote:
> >>> > > > > > > >
> >>> > > > > > > > > The new API forces iteration through
every element of
> the
> >>> > > buffer
> >>> > > > > > > > > even
> >>> > > > > > > if
> >>> > > > > > > > a
> >>> > > > > > > > > single value to be evicted. What
about implementing
> >>> > > > > > > > > Iterator.remove() method for elements?
The API would
> look
> >>> > like:
> >>> > > > > > > > >
> >>> > > > > > > > > public interface Evictor<T, W
extends Window> extends
> >>> > > > > > > > > Serializable {
> >>> > > > > > > > >
> >>> > > > > > > > >    /**
> >>> > > > > > > > >     *  Optionally evicts elements.
Called before
> >>> windowing
> >>> > > > > function.
> >>> > > > > > > > >     *
> >>> > > > > > > > >     * @param elements The elements
currently in the
> >>> pane. Use
> >>> > > > > > > > > Iterator.remove to evict.
> >>> > > > > > > > >     * @param size The current number
of elements in the
> >>> pane.
> >>> > > > > > > > >     * @param window The {@link Window}
> >>> > > > > > > > >     */
> >>> > > > > > > > >    void evictBefore(Iterable<T>
elements, int size,
> >>> > > > > > > > > EvictorContext
> >>> > > > > > > ctx);
> >>> > > > > > > > >
> >>> > > > > > > > >    /**
> >>> > > > > > > > >     *  Optionally evicts elements.
Called after
> windowing
> >>> > > > function.
> >>> > > > > > > > >     *
> >>> > > > > > > > >     * @param elements The elements
currently in the
> >>> pane. Use
> >>> > > > > > > > > Iterator.remove to evict.
> >>> > > > > > > > >     * @param size The current number
of elements in the
> >>> pane.
> >>> > > > > > > > >     * @param window The {@link Window}
> >>> > > > > > > > >     */
> >>> > > > > > > > >    void evictAfter(Iterable<T>
elements, int size,
> >>> > > > > > > > > EvictorContext ctx); }
> >>> > > > > > > > >
> >>> > > > > > > > > Such API allows to abort iteration
at any point and
> evict
> >>> > > > > > > > > elements in
> >>> > > > > > > any
> >>> > > > > > > > > order.
> >>> > > > > > > > >
> >>> > > > > > > > > Thanks,
> >>> > > > > > > > >
> >>> > > > > > > > > Maxim.
> >>> > > > > > > > >
> >>> > > > > > > > > On Wed, Jul 6, 2016 at 9:04 AM,
Vishnu Viswanath <
> >>> > > > > > > > > vishnu.viswanath25@gmail.com>
wrote:
> >>> > > > > > > > > >
> >>> > > > > > > > > > Hi Aljoscha,
> >>> > > > > > > > > >
> >>> > > > > > > > > > Thanks. Yes the new interface
seems to address points
> >>> 1 and
> >>> > > 2.
> >>> > > > > > > > > > of
> >>> > > > > > > > > >
> >>> > > > > > > > > > *1) I am having a use case
where I have to create a
> >>> custom
> >>> > > > > > > > > > Evictor
> >>> > > > > > > that
> >>> > > > > > > > > > will evict elements from the
window based on the
> value
> >>> > (e.g.,
> >>> > > > > > > > > > if I
> >>> > > > > > > have
> >>> > > > > > > > > > elements are of case class
Item(id: Int, type:String)
> >>> then
> >>> > > > > > > > > > evict
> >>> > > > > > > > elements
> >>> > > > > > > > > > that has type="a"). I believe
this is not currently
> >>> > > possible.*
> >>> > > > > > > > > > *2) this is somewhat related
to 1) where there should
> >>> be an
> >>> > > > > > > > > > option to
> >>> > > > > > > > > evict
> >>> > > > > > > > > > elements from anywhere in the
window. not only from
> the
> >>> > > > > > > > > > beginning of
> >>> > > > > > > > the
> >>> > > > > > > > > > window. (e.g., apply the delta
function to all
> >>> elements and
> >>> > > > > > > > > > remove
> >>> > > > > > > all
> >>> > > > > > > > > > those don't pass. I checked
the code and evict method
> >>> just
> >>> > > > > > > > > > returns
> >>> > > > > > > the
> >>> > > > > > > > > > number of elements to be removed
and
> >>> processTriggerResult
> >>> > > just
> >>> > > > > > > > > > skips
> >>> > > > > > > > > those
> >>> > > > > > > > > > many elements from the beginning.
 *
> >>> > > > > > > > > > *3) Add an option to enables
the user to decide if
> the
> >>> > > > > > > > > > eviction
> >>> > > > > > > should
> >>> > > > > > > > > > happen before the apply function
or after the apply
> >>> > function.
> >>> > > > > > > Currently
> >>> > > > > > > > > it
> >>> > > > > > > > > > is before the apply function,
but I have a use case
> >>> where I
> >>> > > > > > > > > > need to
> >>> > > > > > > > first
> >>> > > > > > > > > > apply the function and evict
afterward.*
> >>> > > > > > > > > >
> >>> > > > > > > > > > I would be interested in contributing
to the code
> base.
> >>> > > Please
> >>> > > > > > > > > > let me
> >>> > > > > > > > > know
> >>> > > > > > > > > > the steps.
> >>> > > > > > > > > >
> >>> > > > > > > > > > Thanks and Regards,
> >>> > > > > > > > > > Vishnu Viswanath
> >>> > > > > > > > > >
> >>> > > > > > > > > > On Wed, Jul 6, 2016 at 11:49
AM, Aljoscha Krettek <
> >>> > > > > > > aljoscha@apache.org
> >>> > > > > > > > >
> >>> > > > > > > > > > wrote:
> >>> > > > > > > > > >
> >>> > > > > > > > > > > Hi,
> >>> > > > > > > > > > > as mentioned in the thread
on improving the
> Windowing
> >>> > API I
> >>> > > > > > > > > > > also
> >>> > > > > > > > have a
> >>> > > > > > > > > > > design doc just for improving
WindowEvictors. I had
> >>> this
> >>> > in
> >>> > > > > > > > > > > my head
> >>> > > > > > > > for
> >>> > > > > > > > > a
> >>> > > > > > > > > > > while but was hesitant
to publish but since people
> >>> are
> >>> > > > > > > > > > > asking about
> >>> > > > > > > > > this
> >>> > > > > > > > > > > now might be a good time
to post it. Here's the
> doc:
> >>> > > > > > > > > > >
> >>> > > > > > > > >
> >>> > > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > >
> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAj
> >>> > > > > > > m5
> >>> > > > > > > i9E4A_JlU/edit?usp=sharing
> >>> > > > > > > > > > >
> >>> > > > > > > > > > > Feedback/Suggestions are
very welcome! Please let
> me
> >>> know
> >>> > > > > > > > > > > what you
> >>> > > > > > > > > think.
> >>> > > > > > > > > > >
> >>> > > > > > > > > > > @Vishnu: Are you interested
in contributing a
> >>> solution
> >>> > for
> >>> > > > > > > > > > > this to
> >>> > > > > > > > the
> >>> > > > > > > > > > > Flink code base? I'd be
very happy to work with you
> >>> on
> >>> > > this.
> >>> > > > > > > > > > >
> >>> > > > > > > > > > > Cheers,
> >>> > > > > > > > > > > Aljoscha
> >>> > > > > > > > > > >
> >>> > > > > > > > > > > P.S. I think it would
be best to keep discussions
> to
> >>> the
> >>> > ML
> >>> > > > > > > > > > > because comments on the
doc will not be visible
> here
> >>> for
> >>> > > > > > everyone.
> >>> > > > > > > > > > >
> >>> > > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> > ​
> >>> >
> >>>
> >>
> >>
> >>
> >>
> >>
> >
> > ​
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message