flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
Date Tue, 18 Oct 2016 08:34:36 GMT
Hi,
yes, I think it's fine if we keep it in the same package as the Evictor.
StreamRecord is more of an internal class that should not really be user
facing, that's my motivation for replacing it.

Cheers,
Aljoscha

On Mon, 17 Oct 2016 at 19:23 Vishnu Viswanath <vishnu.viswanath25@gmail.com>
wrote:

> Hi Aljoscha,
>
> Thanks for the response.
>
> I did think about creating a new class similar to TimestampedValue as you
> suggested, but that class looked almost same as the current StreamRecord<T>
> class. (Both have a timestamp field and a value field).
>
> Do you think it is fine to have another class for holding (timestamp,value)
> tuple?
>
> Regards,
> Vishnu
>
> On Mon, Oct 17, 2016 at 4:19 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > Hi Vishnu,
> > what you suggested is spot on! Please go forward with it like this.
> >
> > One small suggestion would be to change Tuple2<Long, T> to something like
> > TimestampedValue<T> to not rely on tuples because they can be confusing
> for
> > people who write Scala code because they are not Scala tuples. That's not
> > strictly necessary, though, you can spin it however you like.
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath <
> vishnu.viswanath25@gmail.com
> > >
> > wrote:
> >
> > > Hi Radu,
> > >
> > > Yes we can remove elements randomly using iterator.remove()
> > >
> > > Regards,
> > > Vishnu
> > >
> > > On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran <radu.tudoran@huawei.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I must apologies that I missed some of the email exchanges on this
> > thread
> > > > and thus my remark/question might have been already settled.
> > > >
> > > > Does this interface you propose enable to remove also elements out of
> > > > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer
> to
> > be
> > > > able to evict 2 and 4?
> > > > We discussed about this some email exchanges ago but as I said I am
> not
> > > > sure if this functionality is captured in this interface. Basically,
> > will
> > > > the typical remove() method from Iterators be available?
> > > >
> > > > Best regards,
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Vishnu Viswanath [mailto:vishnu.viswanath25@gmail.com]
> > > > Sent: Friday, October 07, 2016 8:29 AM
> > > > To: Dev
> > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
> > > >
> > > > Hi Aljoscha,
> > > >
> > > > To pass the time information to Evictor at the same to not expose the
> > > > StreamRecord, I suppose we can change the signature of evictBefore
> and
> > > > evictAfter to take Iterable<Tuple2<Long, T>> instead
> > > > Iterable<StreamRecord<T>>
> > > >
> > > > void evictBefore(Iterable<Tuple2<Long, T>> elements, int size, W
> > window,
> > > > EvictorContext evictorContext);
> > > >
> > > > The fire() method of EvictingWindowOperator can transform the
> > > > Iterable<StreamRecord<IN>> to FluentIterable<Tuple2<Long, IN>> and
> pass
> > > it
> > > > on to the evictor(where f0 will be the timestamp and f1 will the
> > value).
> > > > That way the TimeEvictor will work for EventTime or IngestionTime as
> > long
> > > > as timestamp is set in the StreamRecord. In case timestamp is not
> set,
> > > > TimeEvictor can capture this by checking the Tuple2.f0 (which will be
> > > > Long.MIN_VALUE) and ignore the eviction.
> > > >
> > > > If you think this is fine, I will make the changes and also edit the
> > > FLIP.
> > > >
> > > > Regards,
> > > > Vishnu
> > > >
> > > >
> > > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath <
> > > > vishnu.viswanath25@gmail.com> wrote:
> > > >
> > > > > Thank you Aljoscha,
> > > > >
> > > > > Yes, I agree we don't need ProcessingTimeEvcitor.
> > > > > I will change the current TimeEvictors to use EventTimeEvictor as
> > > > > suggested.
> > > > >
> > > > > Also, figure out a way to pass timestamp to Evictor interface so
> that
> > > we
> > > > > can avoid exposing StreamRecrods.
> > > > >
> > > > > Regards,
> > > > > Vishnu
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >> now you again see what I mentioned a while back: eviction based on
> > > > >> processing time is not really well defined. I think we can
> > completely
> > > > get
> > > > >> rid of "processing time eviction" because it can be replaced by
> > > > something
> > > > >> like this:
> > > > >>
> > > > >> DataStream input = ...
> > > > >> DataStream withTimestamps =
> input.assignTimestampsAndWatermarks(new
> > > > >> IngestionTimeExtractor()) // this will assign the current
> processing
> > > > time
> > > > >> as timestamp
> > > > >> withTimestamps
> > > > >>   .keyBy(...)
> > > > >>   .window(...)
> > > > >>   .evictor(new EventTimeEvictor())
> > > > >>   .apply(...)
> > > > >>
> > > > >> With this, we would just have to find a good way of passing the
> > > > timestamps
> > > > >> in the Evictor interface and a good way of implementing the
> > > > >> EvictingWindowOperator.
> > > > >>
> > > > >> Cheers,
> > > > >> Aljoscha
> > > > >>
> > > > >>
> > > > >> On Sun, 18 Sep 2016 at 18:14 Vishnu Viswanath <
> > > > >> vishnu.viswanath25@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Aljoscha,
> > > > >> >
> > > > >> > A)
> > > > >> > I tried the approach where we set the ProcessingTime explicitly
> by
> > > > >> > converting DataStream<T> input  to DataStream<Tuple2<Long, T>>
> > using
> > > > map
> > > > >> > function and below are my observations:
> > > > >> > 1. All the current code which uses TimeEvictor (which will be by
> > > > default
> > > > >> > changed to ProcessingTimeEvictor) will be forced to implement a
> > > > mapping
> > > > >> > Function to agree with the new method signature.
> > > > >> > 2. Even after doing the above mapping function, the timestamp
> > field
> > > of
> > > > >> the
> > > > >> > StreamRecord will not be changed. Which might be confusing since
> > now
> > > > we
> > > > >> > have two timestamps for the record, one set by the mapping
> > function,
> > > > >> other
> > > > >> > in the StreamRecord.
> > > > >> > 3. Having a Stream of Tuple2<Long, T> makes it confusing to do
> the
> > > > keyBy
> > > > >> > and also the now the WindowFunction has to process
> Tuple2<Long,T>
> > > > >> instead
> > > > >> > of T.
> > > > >> > 4. Users might get confused on how to set the ProcessingTime
> since
> > > > >> > ProcessingTime is the time at which the records are processed
> and
> > > > users
> > > > >> > might expect that to be a responsibility of Flink
> > > > >> >
> > > > >> > Ideally, ProcessingTime should be the time at which a
> StreamRecord
> > > is
> > > > >> > processed. And if a record is Processed multiple times, e.g., in
> > the
> > > > >> case
> > > > >> > when an element was not evicted from the window, hence processed
> > > again
> > > > >> > during the next trigger the ProcessingTime should be the time at
> > > which
> > > > >> the
> > > > >> > record was seen/processed the first time. "If my understanding
> of
> > > > >> > ProcessingTime is correct", I am thinking I can iterate through
> > the
> > > > >> records
> > > > >> > and set the current timestamp as the ProcessingTime if absent.
> > > (before
> > > > >> > doing the eviction)
> > > > >> >
> > > > >> > Something like:
> > > > >> > for(StreamRecord<Object> element: elements) {
> > > > >> > if (!element.hasTimestamp()) {
> > > > >> > element.setTimestamp(System.currentTimeMillis());
> > > > >> > }
> > > > >> > }
> > > > >> >
> > > > >> > B) Regarding not exposing StreamRecord<IN> in the Evictor. If
> > > Evictor
> > > > is
> > > > >> > given Iterable<IN> then we cannot retrieve time information of
> the
> > > > >> records
> > > > >> > in the EventTimeEvictor do the eviction (but I do see that
> > > > StreamRecord
> > > > >> is
> > > > >> > marked with @Internal)
> > > > >> >
> > > > >> > C) Regarding modifying WindowOperator class to take type
> parameter
> > > <S
> > > > >> > extends AppendingState<IN, ACC>> so that we can remove the
> > duplicate
> > > > >> code
> > > > >> > from EvictingWindowOperator, I would prefer to separate it from
> > this
> > > > >> FLIP
> > > > >> > and create a JIRA for it, what do you say?
> > > > >> >
> > > > >> > Please let me know your thoughts.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Vishnu
> > > > >> >
> > > > >> > On Sun, Jul 31, 2016 at 12:07 PM, Aljoscha Krettek <
> > > > aljoscha@apache.org
> > > > >> >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi,
> > > > >> > > regarding a), b) and c): The WindowOperator can be extended to
> > > have
> > > > >> this
> > > > >> > > signature:
> > > > >> > > public class WindowOperator<K, IN, ACC, OUT, W extends
> Window, S
> > > > >> extends
> > > > >> > > AppendingState<IN, ACC>>
> > > > >> > >
> > > > >> > > that way the shape of state is generic and
> > EvictingWindowOperator
> > > > can
> > > > >> use
> > > > >> > > ListState<IN> there.
> > > > >> > >
> > > > >> > > regarding 2.: Yes, we can either take the current processing
> > > > >> time/event
> > > > >> > > time or the max timestamp of elements in the window as the
> > > benchmark
> > > > >> > > against which we compare.
> > > > >> > >
> > > > >> > > About ProcessingTimeEvictor: the proposal was to make the
> > > timestamp
> > > > >> > > explicit in the type of elements. Otherwise, how would you
> > access
> > > > the
> > > > >> > > processing time of each element? (As I said, the timestamp
> field
> > > in
> > > > >> > > StreamRecord does not usually contain a processing-time
> > timestamp
> > > > and
> > > > >> I
> > > > >> > > would like to remove the StreamRecord from the type of the
> > > Iterable
> > > > >> that
> > > > >> > is
> > > > >> > > passed to the evictor to avoid code duplication in
> > > > >> > EvictingWindowOperator)
> > > > >> > > I'm open for suggestions there since I didn't come up with a
> > > better
> > > > >> > > solution yet. :-)
> > > > >> > >
> > > > >> > > Cheers,
> > > > >> > > Aljoscha
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > On Sat, 30 Jul 2016 at 05:56 Vishnu Viswanath <
> > > > >> > > vishnu.viswanath25@gmail.com>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Aljoscha,
> > > > >> > > >
> > > > >> > > > 1. Regarding the Evictor interface taking Iterable<IN>
> instead
> > > of
> > > > >> > > > StreamRecord -
> > > > >> > > >
> > > > >> > > >  a) I am not quite sure I understood what you meant by *"It
> > > could
> > > > >> be a
> > > > >> > > very
> > > > >> > > > thin subclass of WindowOperator"* - Currently, most of the
> > code
> > > > >> > > duplication
> > > > >> > > > in EvictingWindowOperator is due to  the
> windowStateDescriptor
> > > > >> > (ListState
> > > > >> > > > instead of AppendingState compared to WindowOperator). Is
> this
> > > > >> > correct?.
> > > > >> > > >
> > > > >> > > >  b) Do you hope to keep using AppendingState instead of
> > > ListState
> > > > to
> > > > >> > > avoid
> > > > >> > > > the duplicate code (e.g., processWatermark(), trigger()
> etc).
> > If
> > > > we
> > > > >> use
> > > > >> > > > AppendingState, the get() method returns an state of the OUT
> > > type
> > > > >> ACC,
> > > > >> > > > which cannot be passed to Evictor. So I am assuming we will
> > have
> > > > to
> > > > >> > keep
> > > > >> > > > using ListState here.
> > > > >> > > >
> > > > >> > > >  c) My not so good idea was to use the FluentIterable to
> > convert
> > > > the
> > > > >> > > > Iterable<StreamRecord<IN>> to Iterable<IN> and pass it on to
> > > > Evictor
> > > > >> > and
> > > > >> > > > Window function. Evictor can remove the elements from the
> > > > Iterable.
> > > > >> > (Even
> > > > >> > > > Window function can remove elements). Then clear the state
> and
> > > add
> > > > >> > > > elements(after removal) back to the state. But in that
> case, I
> > > > need
> > > > >> to
> > > > >> > > > reconstruct StreamRecord<IN> from IN. Doing so, we will lose
> > the
> > > > >> > > timestamp
> > > > >> > > > information that might have been previously set on the
> > original
> > > > >> > > > StreamRecord<IN> - is there any other way to recreate
> > > > StreamRecord?
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > 2. Regarding ProcessingTimeEvictor -
> > > > >> > > >
> > > > >> > > > A TimeEvictor has to evict elements from the window which
> are
> > > > older
> > > > >> > than
> > > > >> > > a
> > > > >> > > > given Period from the element with maximum timestamp in the
> > > > window.
> > > > >> > When
> > > > >> > > > considering ProcessingTimestamp(even if it was explicitly
> > set),
> > > > >> > shouldn't
> > > > >> > > > the timestamp associated with records be strictly
> increasing.
> > > > i.e.,
> > > > >> > newer
> > > > >> > > > elements should have higher timestamp than earlier elements.
> > So
> > > to
> > > > >> get
> > > > >> > > the
> > > > >> > > > max timestamp we could just get the last element. When using
> > > > >> > > > EventTimeEvictor, the elements might have arrived out of
> order
> > > > >> hence we
> > > > >> > > > can't just take the timestamp of the last element as maximum
> > > > >> timestamp,
> > > > >> > > but
> > > > >> > > > check each and every element in the window.
> > > > >> > > >
> > > > >> > > > We should have two versions of TimeEvictors - EventTime and
> > > > >> > > ProcessingTime,
> > > > >> > > > but does ProcessingTimeEvictor need to take a Tupel2<Long,T>
> > > since
> > > > >> > > anyways
> > > > >> > > > we are going to get the max timestamp by looking at the last
> > > > >> element in
> > > > >> > > the
> > > > >> > > > window?.
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Vishnu
> > > > >> > > >
> > > > >> > > > On Fri, Jul 29, 2016 at 6:22 AM, Aljoscha Krettek <
> > > > >> aljoscha@apache.org
> > > > >> > >
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > > > About processing time and timestamps:
> > > > >> > > > >
> > > > >> > > > > The timestamp is either set in the source of in an
> > > > >> > > > > in-between TimestampAssigner that can be used with
> > > > >> > > > > DataStream.assignTimestampsAndWatermarks(). However, the
> > > > >> timestamp in
> > > > >> > > the
> > > > >> > > > > element is normally not a "processing-time timestamp". I
> > think
> > > > it
> > > > >> > might
> > > > >> > > > > make sense to split the functionality for the evictors
> into
> > > two
> > > > >> > parts:
> > > > >> > > > one
> > > > >> > > > > that implicitly sets a timestamp and one that uses these
> > > > >> timestamps.
> > > > >> > It
> > > > >> > > > > could look like this:
> > > > >> > > > >
> > > > >> > > > > DataStream<T> input = ...
> > > > >> > > > > // this makes the current processing time explicit in the
> > > > tuples:
> > > > >> > > > > DataStream<Tuple2<Long, T>> withTimestamps = input.map(new
> > > > >> > > > > ReifyProcessingTIme<T>());
> > > > >> > > > > withTimestamps
> > > > >> > > > >   .keyBy(...)
> > > > >> > > > >   .window(..)
> > > > >> > > > >   .evictor(new ProcessingTimeEvictor<T>())
> > > > >> > > > >   .apply(...)
> > > > >> > > > >
> > > > >> > > > > where ProcessingTimeEvictor looks like this:
> > > > >> > > > >
> > > > >> > > > > class ProcessingTimeEvictor<T> extends
> Evictor<Tuple2<Long,
> > > T>>
> > > > {
> > > > >> > > > >   void evictBefore(Iterable<Tuple2<Long, T>>, ...);
> > > > >> > > > >   void evictAfter ...
> > > > >> > > > > }
> > > > >> > > > >
> > > > >> > > > > This would make everything that is happening explicit in
> the
> > > > type
> > > > >> > > > > signatures and explicit for the user.
> > > > >> > > > >
> > > > >> > > > > Cheers,
> > > > >> > > > > Aljoscha
> > > > >> > > > >
> > > > >> > > > > On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek <
> > > > >> aljoscha@apache.org>
> > > > >> > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi,
> > > > >> > > > > > in fact, changing it to Iterable<IN> would simplify
> things
> > > > >> because
> > > > >> > > then
> > > > >> > > > > we
> > > > >> > > > > > would not have to duplicate code for the
> > > > EvictingWindowOperator
> > > > >> any
> > > > >> > > > more.
> > > > >> > > > > > It could be a very thin subclass of WindowOperator.
> > > > >> > > > > >
> > > > >> > > > > > Cheers,
> > > > >> > > > > > Aljoscha
> > > > >> > > > > >
> > > > >> > > > > > On Wed, 27 Jul 2016 at 03:56 Vishnu Viswanath <
> > > > >> > > > > > vishnu.viswanath25@gmail.com> wrote:
> > > > >> > > > > >
> > > > >> > > > > >> Hi Aljoscha,
> > > > >> > > > > >>
> > > > >> > > > > >> Regarding your concern - to not  expose the
> StreamRecord
> > in
> > > > the
> > > > >> > > > Evictor,
> > > > >> > > > > >> were you able to find any alternative?
> > > > >> > > > > >>
> > > > >> > > > > >> I tried to make the methods take Iterable<IN> input
> > similar
> > > > to
> > > > >> the
> > > > >> > > > > >> WindowFunction, but that didn't work since we have to
> > clear
> > > > the
> > > > >> > > state
> > > > >> > > > > and
> > > > >> > > > > >> add the elements back to the state (to fix the bug
> > > mentioned
> > > > in
> > > > >> > the
> > > > >> > > > > >> previous mail)
> > > > >> > > > > >>
> > > > >> > > > > >> If you think the interface that accepts
> > > > >> Iterable<StreamRecord<T>>
> > > > >> > > > > >> elements is
> > > > >> > > > > >> good enough, I have the changes ready.
> > > > >> > > > > >>
> > > > >> > > > > >> Thanks,
> > > > >> > > > > >> Vishnu
> > > > >> > > > > >>
> > > > >> > > > > >> On Mon, Jul 25, 2016 at 7:48 AM, Aljoscha Krettek <
> > > > >> > > > aljoscha@apache.org>
> > > > >> > > > > >> wrote:
> > > > >> > > > > >>
> > > > >> > > > > >> > Hi,
> > > > >> > > > > >> > the elements are currently not being removed from the
> > > > >> buffers.
> > > > >> > > > That's
> > > > >> > > > > a
> > > > >> > > > > >> bug
> > > > >> > > > > >> > that we could fix while adding the new Evictor
> > interface.
> > > > >> > > > > >> >
> > > > >> > > > > >> > Cheers,
> > > > >> > > > > >> > Aljoscha
> > > > >> > > > > >> >
> > > > >> > > > > >> > On Mon, 25 Jul 2016 at 13:00 Radu Tudoran <
> > > > >> > > radu.tudoran@huawei.com>
> > > > >> > > > > >> wrote:
> > > > >> > > > > >> >
> > > > >> > > > > >> > > Hi Aljoscha,
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > Can you point us to the way it is handled now. Is
> > there
> > > > >> > anything
> > > > >> > > > > else
> > > > >> > > > > >> for
> > > > >> > > > > >> > > the removing of elements other than the skip in
> > > > >> > > > > >> EvictingWindowOperator.
> > > > >> > > > > >> > Is
> > > > >> > > > > >> > > there something as it was before version 1.x where
> > you
> > > > had
> > > > >> an
> > > > >> > > > > explicit
> > > > >> > > > > >> > > remove from window buffers?
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > Dr. Radu Tudoran
> > > > >> > > > > >> > > Research Engineer - Big Data Expert
> > > > >> > > > > >> > > IT R&D Division
> > > > >> > > > > >> > >
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > >> > > > > >> > > European Research Center
> > > > >> > > > > >> > > Riesstrasse 25, 80992 München
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > E-mail: radu.tudoran@huawei.com
> > > > >> > > > > >> > > Mobile: +49 15209084330 <01520%209084330>
> <01520%209084330>
> > > > >> > > > > >> > > Telephone: +49 891588344173 <089%201588344173>
> <089%201588344173>
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > >> > > > > >> > > Hansaallee 205, 40549 Düsseldorf, Germany,
> > > > www.huawei.com
> > > > >> > > > > >> > > Registered Office: Düsseldorf, Register Court
> > > Düsseldorf,
> > > > >> HRB
> > > > >> > > > 56063,
> > > > >> > > > > >> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang
> CHEN
> > > > >> > > > > >> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> > > > Düsseldorf,
> > > > >> HRB
> > > > >> > > > > 56063,
> > > > >> > > > > >> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > >> > > > > >> > > This e-mail and its attachments contain
> confidential
> > > > >> > information
> > > > >> > > > > from
> > > > >> > > > > >> > > HUAWEI, which is intended only for the person or
> > entity
> > > > >> whose
> > > > >> > > > > address
> > > > >> > > > > >> is
> > > > >> > > > > >> > > listed above. Any use of the information contained
> > > herein
> > > > >> in
> > > > >> > any
> > > > >> > > > way
> > > > >> > > > > >> > > (including, but not limited to, total or partial
> > > > >> disclosure,
> > > > >> > > > > >> > reproduction,
> > > > >> > > > > >> > > or dissemination) by persons other than the
> intended
> > > > >> > > recipient(s)
> > > > >> > > > is
> > > > >> > > > > >> > > prohibited. If you receive this e-mail in error,
> > please
> > > > >> notify
> > > > >> > > the
> > > > >> > > > > >> sender
> > > > >> > > > > >> > > by phone or email immediately and delete it!
> > > > >> > > > > >> > >
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > -----Original Message-----
> > > > >> > > > > >> > > From: Aljoscha Krettek [mailto:aljoscha@apache.org
> ]
> > > > >> > > > > >> > > Sent: Monday, July 25, 2016 11:45 AM
> > > > >> > > > > >> > > To: dev@flink.apache.org
> > > > >> > > > > >> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window
> Evictor
> > > in
> > > > >> Flink
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > Hi,
> > > > >> > > > > >> > > I think there is not yet a clear specification for
> > how
> > > > the
> > > > >> > > actual
> > > > >> > > > > >> removal
> > > > >> > > > > >> > > of elements from the buffer will work. I think
> > naively
> > > > one
> > > > >> can
> > > > >> > > do:
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > Iterable<E> currentElements = state.get()
> > > > >> > > > > >> > > evictor.evict(currentElements); // this will remove
> > > some
> > > > >> stuff
> > > > >> > > > from
> > > > >> > > > > >> > there,
> > > > >> > > > > >> > > or mark for removal
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > state.clear()
> > > > >> > > > > >> > > // the Iterable does not loop over the
> removed/marked
> > > > >> elements
> > > > >> > > > > >> > > for (E element : currentElements) {
> > > > >> > > > > >> > >   state.add(element)
> > > > >> > > > > >> > > }
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > This is very costly but the only way I see of doing
> > > this
> > > > >> right
> > > > >> > > now
> > > > >> > > > > >> with
> > > > >> > > > > >> > > every state backend.
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > Cheers,
> > > > >> > > > > >> > > Aljoscha
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > On Mon, 25 Jul 2016 at 09:46 Radu Tudoran <
> > > > >> > > > radu.tudoran@huawei.com>
> > > > >> > > > > >> > wrote:
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > > Hi,
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > Thanks for the clarification. Can someone point
> to
> > > > where
> > > > >> the
> > > > >> > > > > events
> > > > >> > > > > >> are
> > > > >> > > > > >> > > > removed from buffers - I am trying to understand
> > the
> > > > new
> > > > >> > logic
> > > > >> > > > of
> > > > >> > > > > >> > > handling
> > > > >> > > > > >> > > > the eviction in this new API. Thanks
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > -----Original Message-----
> > > > >> > > > > >> > > > From: Vishnu Viswanath [mailto:
> > > vishnu.viswanath25@gma
> > > > >> il.com
> > > > >> > ]
> > > > >> > > > > >> > > > Sent: Saturday, July 23, 2016 3:04 AM
> > > > >> > > > > >> > > > To: Dev
> > > > >> > > > > >> > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window
> > Evictor
> > > > in
> > > > >> > Flink
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > Hi Radu,
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > - Yes we can remove elements from the iterator.
> > > > >> > > > > >> > > > - Right now the EvictingWindowOperator just skips
> > the
> > > > >> > elements
> > > > >> > > > > from
> > > > >> > > > > >> the
> > > > >> > > > > >> > > > Iterable before passing to the window
> function(Yes
> > > this
> > > > >> has
> > > > >> > to
> > > > >> > > > be
> > > > >> > > > > >> > changed
> > > > >> > > > > >> > > > in the new API)
> > > > >> > > > > >> > > > - Regarding how the last question on how elements
> > are
> > > > >> being
> > > > >> > > > > removed
> > > > >> > > > > >> > from
> > > > >> > > > > >> > > > the window buffer. I am not sure how it is
> working
> > > > right
> > > > >> > now,
> > > > >> > > > but
> > > > >> > > > > >> when
> > > > >> > > > > >> > > > trying out the new API that I am working on, I
> did
> > > > find a
> > > > >> > bug
> > > > >> > > > > where
> > > > >> > > > > >> the
> > > > >> > > > > >> > > > evicted elements are not actually removed from
> the
> > > > >> State. I
> > > > >> > > have
> > > > >> > > > > >> added
> > > > >> > > > > >> > a
> > > > >> > > > > >> > > > fix for that.  (You can see a mail regarding that
> > in
> > > > this
> > > > >> > mail
> > > > >> > > > > >> chain)
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > Thanks,
> > > > >> > > > > >> > > > Vishnu
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > On Fri, Jul 22, 2016 at 1:03 PM, Radu Tudoran <
> > > > >> > > > > >> radu.tudoran@huawei.com
> > > > >> > > > > >> > >
> > > > >> > > > > >> > > > wrote:
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > > > > Hi,
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > Overall I believe that the interfaces and the
> > > > proposal
> > > > >> is
> > > > >> > > > good.
> > > > >> > > > > I
> > > > >> > > > > >> > have
> > > > >> > > > > >> > > > the
> > > > >> > > > > >> > > > > following question though: can you delete via
> the
> > > > >> iterator
> > > > >> > > > > >> > > > > (Iterable<StreamRecord<T>> elements) the
> > elements?
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > I tried to look over the code where the
> eviction
> > > > >> happens
> > > > >> > (I
> > > > >> > > > did
> > > > >> > > > > >> not
> > > > >> > > > > >> > do
> > > > >> > > > > >> > > > > these since version 0.10...looks very different
> > now
> > > > :)
> > > > >> > > )...the
> > > > >> > > > > >> only
> > > > >> > > > > >> > > > > reference I found was the
> EvictingWindowOperator
> > > > which
> > > > >> at
> > > > >> > > the
> > > > >> > > > > >> > > > > fireOrContinue has a "skip" based on the number
> > of
> > > > >> > elements
> > > > >> > > > > >> returned
> > > > >> > > > > >> > > from
> > > > >> > > > > >> > > > > the evictor...and these are not put in the
> > > collection
> > > > >> to
> > > > >> > be
> > > > >> > > > > given
> > > > >> > > > > >> to
> > > > >> > > > > >> > > the
> > > > >> > > > > >> > > > > user function to be applied. I think these will
> > > also
> > > > >> need
> > > > >> > to
> > > > >> > > > be
> > > > >> > > > > >> > changed
> > > > >> > > > > >> > > > to
> > > > >> > > > > >> > > > > adjust to the "any operator from anywhere in
> the
> > > > window
> > > > >> > > > buffer".
> > > > >> > > > > >> > > > > Also - as we are on this topic - can someone
> > > explain
> > > > >> how
> > > > >> > > these
> > > > >> > > > > >> > elements
> > > > >> > > > > >> > > > > that are not consider anymore for the user
> > function
> > > > are
> > > > >> > > > actually
> > > > >> > > > > >> > > deleted
> > > > >> > > > > >> > > > > from the window buffer?..i did not manage to
> find
> > > > >> this..
> > > > >> > > some
> > > > >> > > > > >> > reference
> > > > >> > > > > >> > > > to
> > > > >> > > > > >> > > > > classes/code where this happens would be useful
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > Dr. Radu Tudoran
> > > > >> > > > > >> > > > > Research Engineer - Big Data Expert
> > > > >> > > > > >> > > > > IT R&D Division
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > >> > > > > >> > > > > European Research Center
> > > > >> > > > > >> > > > > Riesstrasse 25, 80992 München
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > E-mail: radu.tudoran@huawei.com
> > > > >> > > > > >> > > > > Mobile: +49 15209084330 <01520%209084330>
> <01520%209084330>
> > > > >> > > > > >> > > > > Telephone: +49 891588344173 <089%201588344173>
> <089%201588344173>
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > >> > > > > >> > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> > > > >> www.huawei.com
> > > > >> > > > > >> > > > > Registered Office: Düsseldorf, Register Court
> > > > >> Düsseldorf,
> > > > >> > > HRB
> > > > >> > > > > >> 56063,
> > > > >> > > > > >> > > > > Managing Director: Bo PENG, Wanzhou MENG,
> Lifang
> > > CHEN
> > > > >> > > > > >> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> > > > >> Düsseldorf,
> > > > >> > > HRB
> > > > >> > > > > >> 56063,
> > > > >> > > > > >> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang
> > CHEN
> > > > >> > > > > >> > > > > This e-mail and its attachments contain
> > > confidential
> > > > >> > > > information
> > > > >> > > > > >> from
> > > > >> > > > > >> > > > > HUAWEI, which is intended only for the person
> or
> > > > entity
> > > > >> > > whose
> > > > >> > > > > >> address
> > > > >> > > > > >> > > is
> > > > >> > > > > >> > > > > listed above. Any use of the information
> > contained
> > > > >> herein
> > > > >> > in
> > > > >> > > > any
> > > > >> > > > > >> way
> > > > >> > > > > >> > > > > (including, but not limited to, total or
> partial
> > > > >> > disclosure,
> > > > >> > > > > >> > > > reproduction,
> > > > >> > > > > >> > > > > or dissemination) by persons other than the
> > > intended
> > > > >> > > > > recipient(s)
> > > > >> > > > > >> is
> > > > >> > > > > >> > > > > prohibited. If you receive this e-mail in
> error,
> > > > please
> > > > >> > > notify
> > > > >> > > > > the
> > > > >> > > > > >> > > sender
> > > > >> > > > > >> > > > > by phone or email immediately and delete it!
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > -----Original Message-----
> > > > >> > > > > >> > > > > From: Vishnu Viswanath [mailto:
> > > > >> > vishnu.viswanath25@gmail.com
> > > > >> > > ]
> > > > >> > > > > >> > > > > Sent: Friday, July 22, 2016 12:43 PM
> > > > >> > > > > >> > > > > To: Dev
> > > > >> > > > > >> > > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window
> > > Evictor
> > > > >> in
> > > > >> > > Flink
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > Hi,
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > I have created a FLIP page for this enhancement
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > >
> > > > >> > > > > >> >
> > > > >> > > > > >>
> > > > >> > > > >
> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > >> > > 4+%3A+Enhance+Window+Evictor
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > Thanks,
> > > > >> > > > > >> > > > > Vishnu
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > On Thu, Jul 21, 2016 at 6:53 AM, Vishnu
> > Viswanath <
> > > > >> > > > > >> > > > > vishnu.viswanath25@gmail.com> wrote:
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > > Thanks Aljoscha.
> > > > >> > > > > >> > > > > >
> > > > >> > > > > >> > > > > > On Thu, Jul 21, 2016 at 4:46 AM, Aljoscha
> > > Krettek <
> > > > >> > > > > >> > > aljoscha@apache.org
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > > > > wrote:
> > > > >> > > > > >> > > > > >
> > > > >> > > > > >> > > > > >> Hi,
> > > > >> > > > > >> > > > > >> this, in fact, seems to be a bug. There
> should
> > > be
> > > > >> > > something
> > > > >> > > > > >> like
> > > > >> > > > > >> > > > > >> windowState.clear();
> > > > >> > > > > >> > > > > >> for (IN element: projectedContents) {
> > > > >> > > > > >> > > > > >>    windowState.add(element);
> > > > >> > > > > >> > > > > >> }
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >> after passing the elements to the window
> > > function.
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >> This is very inefficient but the only way I
> > see
> > > of
> > > > >> > doing
> > > > >> > > it
> > > > >> > > > > >> right
> > > > >> > > > > >> > > now.
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >> Cheers,
> > > > >> > > > > >> > > > > >> Aljoscha
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >> On Thu, 21 Jul 2016 at 01:32 Vishnu
> Viswanath
> > <
> > > > >> > > > > >> > > > > >> vishnu.viswanath25@gmail.com>
> > > > >> > > > > >> > > > > >> wrote:
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > > >> > Hi,
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > When we use RocksDB as state backend, how
> > does
> > > > the
> > > > >> > > > backend
> > > > >> > > > > >> state
> > > > >> > > > > >> > > get
> > > > >> > > > > >> > > > > >> > updated after some elements are evicted
> from
> > > the
> > > > >> > > window?
> > > > >> > > > > >> > > > > >> > I don't see any update call being made to
> > > remove
> > > > >> the
> > > > >> > > > > element
> > > > >> > > > > >> > from
> > > > >> > > > > >> > > > the
> > > > >> > > > > >> > > > > >> state
> > > > >> > > > > >> > > > > >> > stored in RocksDB.
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > It looks like the RocksDBListState is only
> > > > having
> > > > >> > get()
> > > > >> > > > and
> > > > >> > > > > >> > add()
> > > > >> > > > > >> > > > > >> methods
> > > > >> > > > > >> > > > > >> > since it is an AppendingState, but that
> > causes
> > > > the
> > > > >> > > > evicted
> > > > >> > > > > >> > > elements
> > > > >> > > > > >> > > > to
> > > > >> > > > > >> > > > > >> come
> > > > >> > > > > >> > > > > >> > back when the trigger is fired next time.
> > (It
> > > > >> works
> > > > >> > > fine
> > > > >> > > > > >> when I
> > > > >> > > > > >> > > use
> > > > >> > > > > >> > > > > >> > MemoryStateBackend)
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > Is this expected behavior or am I missing
> > > > >> something.
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > Thanks,
> > > > >> > > > > >> > > > > >> > Vishnu
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > On Mon, Jul 18, 2016 at 7:15 AM, Vishnu
> > > > Viswanath
> > > > >> <
> > > > >> > > > > >> > > > > >> > vishnu.viswanath25@gmail.com> wrote:
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >> > > Hi Aljoscha,
> > > > >> > > > > >> > > > > >> > >
> > > > >> > > > > >> > > > > >> > > Thanks! Yes, I have the create page
> option
> > > now
> > > > >> in
> > > > >> > > wiki.
> > > > >> > > > > >> > > > > >> > >
> > > > >> > > > > >> > > > > >> > > Regards,
> > > > >> > > > > >> > > > > >> > > Vishnu Viswanath,
> > > > >> > > > > >> > > > > >> > >
> > > > >> > > > > >> > > > > >> > > On Mon, Jul 18, 2016 at 6:34 AM,
> Aljoscha
> > > > >> Krettek <
> > > > >> > > > > >> > > > > >> aljoscha@apache.org>
> > > > >> > > > > >> > > > > >> > > wrote:
> > > > >> > > > > >> > > > > >> > >
> > > > >> > > > > >> > > > > >> > >> @Radu, addition of more window types
> and
> > > > >> sorting
> > > > >> > > > should
> > > > >> > > > > be
> > > > >> > > > > >> > part
> > > > >> > > > > >> > > > of
> > > > >> > > > > >> > > > > >> > another
> > > > >> > > > > >> > > > > >> > >> design proposal. This is interesting
> > stuff
> > > > but
> > > > >> I
> > > > >> > > think
> > > > >> > > > > we
> > > > >> > > > > >> > > should
> > > > >> > > > > >> > > > > keep
> > > > >> > > > > >> > > > > >> > >> issues separated because things can get
> > > > >> > complicated
> > > > >> > > > very
> > > > >> > > > > >> > > quickly.
> > > > >> > > > > >> > > > > >> > >>
> > > > >> > > > > >> > > > > >> > >> On Mon, 18 Jul 2016 at 12:32 Aljoscha
> > > > Krettek <
> > > > >> > > > > >> > > > aljoscha@apache.org
> > > > >> > > > > >> > > > > >
> > > > >> > > > > >> > > > > >> > >> wrote:
> > > > >> > > > > >> > > > > >> > >>
> > > > >> > > > > >> > > > > >> > >> > Hi,
> > > > >> > > > > >> > > > > >> > >> > about TimeEvictor, yes, I think there
> > > > should
> > > > >> be
> > > > >> > > > > specific
> > > > >> > > > > >> > > > evictors
> > > > >> > > > > >> > > > > >> for
> > > > >> > > > > >> > > > > >> > >> > processing time and event time. Also,
> > the
> > > > >> > current
> > > > >> > > > time
> > > > >> > > > > >> > should
> > > > >> > > > > >> > > > be
> > > > >> > > > > >> > > > > >> > >> > retrievable from the EvictorContext.
> > > > >> > > > > >> > > > > >> > >> >
> > > > >> > > > > >> > > > > >> > >> > For the wiki you will need
> permissions.
> > > > This
> > > > >> was
> > > > >> > > > > >> recently
> > > > >> > > > > >> > > > changed
> > > > >> > > > > >> > > > > >> > >> because
> > > > >> > > > > >> > > > > >> > >> > there was too much spam. I gave you
> > > > >> permission
> > > > >> > to
> > > > >> > > > add
> > > > >> > > > > >> > pages.
> > > > >> > > > > >> > > > Can
> > > > >> > > > > >> > > > > >> you
> > > > >> > > > > >> > > > > >> > >> please
> > > > >> > > > > >> > > > > >> > >> > try and check if it works?
> > > > >> > > > > >> > > > > >> > >> >
> > > > >> > > > > >> > > > > >> > >> > Cheers,
> > > > >> > > > > >> > > > > >> > >> > Aljoscha
> > > > >> > > > > >> > > > > >> > >> >
> > > > >> > > > > >> > > > > >> > >> > On Fri, 15 Jul 2016 at 13:28 Vishnu
> > > > >> Viswanath <
> > > > >> > > > > >> > > > > >> > >> > vishnu.viswanath25@gmail.com> wrote:
> > > > >> > > > > >> > > > > >> > >> >
> > > > >> > > > > >> > > > > >> > >> >> Hi all,
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >> How do we create a FLIP page, is
> there
> > > any
> > > > >> > > > permission
> > > > >> > > > > >> > setup
> > > > >> > > > > >> > > > > >> > required? I
> > > > >> > > > > >> > > > > >> > >> >> don't see any "Create" page(after
> > > logging
> > > > >> in)
> > > > >> > > > option
> > > > >> > > > > in
> > > > >> > > > > >> > the
> > > > >> > > > > >> > > > > >> header as
> > > > >> > > > > >> > > > > >> > >> >> mentioned in
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >>
> > > > >> > > > > >> > > > > >> >
> > > > >> > > > > >> > > > > >>
> > > > >> > > > > >> > > > >
> > > > >> > > > > >> > > >
> > > > >> > > > > >> > >
> > > > >> > > > > >> >
> > > > >> > > > > >>
> > > > >> > > > >
> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/
> > > > >> > > Flink+Improvement+Proposals
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >> Thanks,
> > > > >> > > > > >> > > > > >> > >> >> Vishnu
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >> On Wed, Jul 13, 2016 at 10:22 PM,
> > Vishnu
> > > > >> > > Viswanath
> > > > >> > > > <
> > > > >> > > > > >> > > > > >> > >> >> vishnu.viswanath25@gmail.com>
> wrote:
> > > > >> > > > > >> > > > > >> > >> >>
> > > > >> > > > > >> > > > > >> > >> >> > Hi Aljoscha,
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > I agree, the user will know
> exactly
> > > that
> > > > >> they
> > > > >> > > are
> > > > >> > > > > >> > creating
> > > > >> > > > > >> > > > an
> > > > >> > > > > >> > > > > >> > >> EventTime
> > > > >> > > > > >> > > > > >> > >> >> > based evictor or ProcessingTime
> > based
> > > > >> evictor
> > > > >> > > > > >> looking at
> > > > >> > > > > >> > > the
> > > > >> > > > > >> > > > > >> code.
> > > > >> > > > > >> > > > > >> > >> >> > So do you think it will be ok to
> > have
> > > > >> > multiple
> > > > >> > > > > >> versions
> > > > >> > > > > >> > of
> > > > >> > > > > >> > > > > >> > >> TimeEvictor
> > > > >> > > > > >> > > > > >> > >> >> > (one for event time and one for
> > > > processing
> > > > >> > > time)
> > > > >> > > > > and
> > > > >> > > > > >> > also
> > > > >> > > > > >> > > a
> > > > >> > > > > >> > > > > >> > >> DeltaEvcitor
> > > > >> > > > > >> > > > > >> > >> >> > (again 2 versions- for event time
> > and
> > > > >> > > processing
> > > > >> > > > > >> time) ?
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > Please note that the existing
> > behavior
> > > > of
> > > > >> > > > > >> > > > > >> TimeEvictor/DeltaEvictor
> > > > >> > > > > >> > > > > >> > >> does
> > > > >> > > > > >> > > > > >> > >> >> > not consider if it is EventTime or
> > > > >> > > ProcessingTime
> > > > >> > > > > >> > > > > >> > >> >> > e.g., in TimeEvictor the current
> > time
> > > is
> > > > >> > > > considered
> > > > >> > > > > >> as
> > > > >> > > > > >> > the
> > > > >> > > > > >> > > > > >> > timestamp
> > > > >> > > > > >> > > > > >> > >> of
> > > > >> > > > > >> > > > > >> > >> >> > the last element in the window
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > *long currentTime =
> > > > >> > > > > >> > > > > Iterables.getLast(elements).getTimestamp();*
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > not the highest timestamp of all
> > > > elements
> > > > >> > > > > >> > > > > >> > >> >> > what I am trying to achieve is
> > > something
> > > > >> > like:
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > *long currentTime;*
> > > > >> > > > > >> > > > > >> > >> >> > * if (ctx.isEventTime()) {*
> > > > >> > > > > >> > > > > >> > >> >> > * currentTime =
> > > > >> getMaxTimestamp(elements);*
> > > > >> > > > > >> > > > > >> > >> >> > * } else {*
> > > > >> > > > > >> > > > > >> > >> >> > * currentTime =
> > > > >> > > > > >> > > Iterables.getLast(elements).getTimestamp();*
> > > > >> > > > > >> > > > > >> > >> >> > * }*
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > Similarly, in DeltaEvictor the
> > > > >> > *`lastElement`*
> > > > >> > > is
> > > > >> > > > > >> > > > > >> > >> >> > *`Iterables.getLast(elements);`*
> > and I
> > > > am
> > > > >> > > > thinking
> > > > >> > > > > we
> > > > >> > > > > >> > > should
> > > > >> > > > > >> > > > > >> > consider
> > > > >> > > > > >> > > > > >> > >> >> the
> > > > >> > > > > >> > > > > >> > >> >> > element with max timestamp as the
> > last
> > > > >> > element
> > > > >> > > > > >> instead
> > > > >> > > > > >> > of
> > > > >> > > > > >> > > > just
> > > > >> > > > > >> > > > > >> > >> getting
> > > > >> > > > > >> > > > > >> > >> >> the
> > > > >> > > > > >> > > > > >> > >> >> > last inserted element as
> > > *`lastElement`*
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > Do you think it is the right thing
> > to
> > > do
> > > > >> or
> > > > >> > > leave
> > > > >> > > > > the
> > > > >> > > > > >> > > > behavior
> > > > >> > > > > >> > > > > >> > >> Evictors
> > > > >> > > > > >> > > > > >> > >> >> as
> > > > >> > > > > >> > > > > >> > >> >> > is, w.r.t to choosing the last
> > > element?
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > Thanks,
> > > > >> > > > > >> > > > > >> > >> >> > Vishnu
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > On Wed, Jul 13, 2016 at 11:07 AM,
> > > > Aljoscha
> > > > >> > > > Krettek
> > > > >> > > > > <
> > > > >> > > > > >> > > > > >> > >> aljoscha@apache.org
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> > wrote:
> > > > >> > > > > >> > > > > >> > >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> I still think it should be
> explicit
> > > in
> > > > >> the
> > > > >> > > > class.
> > > > >> > > > > >> For
> > > > >> > > > > >> > > > > example,
> > > > >> > > > > >> > > > > >> if
> > > > >> > > > > >> > > > > >> > >> you
> > > > >> > > > > >> > > > > >> > >> >> have
> > > > >> > > > > >> > > > > >> > >> >> >> this code:
> > > > >> > > > > >> > > > > >> > >> >> >>
> > > > >> > > > > >> > > > > >> > >> >> >> input
> > > > >> > > > > >> > > > > >> > >> >> >>   .keyBy()
> > > > >> > > > > >> > > > > >> > >> >> >>   .window()
> > > > >> > > > > >> > > > > >> > >> >> >>   .trigger(EventTimeTrigger.cre
> > ate())
> > > > >> > > > > >> > > > > >> > >> >> >>   .evictor(TimeTrigger.create())
> > > > >> > > > > >> > > > > >> > >> >> >>
> > > > >> > > > > >> > > > > >> > >> >> >> the time behavior of the trigger
> is
> > > > >> > explicitly
> > > > >> > > > > >> > specified
> > > > >> > > > > >> > > > > while
> > > > >> > > > > >> > > > > >> the
> > > > >> > > > > >> > > > > >> > >> >> evictor
> > > > >> > > > > >> > > > > >> > >> >> >> would dynamically adapt based on
> > > > internal
> > > > >> > > > workings
> > > > >> > > > > >> that
> > > > >> > > > > >> > > the
> > > > >> > > > > >> > > > > >> user
> > > > >> > > > > >> > > > > >> > >> might
> > > > >> > > > > >> > > > > >> > >> >> not
> > > > >> > > > > >> > > > > >> > >> >> >> be aware of. Having the behavior
> > > > >> explicit at
> > > > >> > > the
> > > > >> > > > > >> call
> > > > >> > > > > >> > > site
> > > > >> > > > > >> > > > is
> > > > >> > > > > >> > > > > >> very
> > > > >> > > > > >> > > > > >> > >> >> >> important, in my opinion.
> > > > >> > > > > >> > > > > >> > >> >> >>
> > > > >> > > > > >> > > > > >> > >> >> >> On Wed, 13 Jul 2016 at 16:28
> Vishnu
> > > > >> > Viswanath
> > > > >> > > <
> > > > >> > > > > >> > > > > >> > >> >> >> vishnu.viswanath25@gmail.com>
> > > > >> > > > > >> > > > > >> > >> >> >> wrote:
> > > > >> > > > > >> > > > > >> > >> >> >>
> > > > >> > > > > >> > > > > >> > >> >> >> > Hi,
> > > > >> > > > > >> > > > > >> > >> >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> > I was hoping to use the
> > isEventTime
> > > > >> method
> > > > >> > > in
> > > > >> > > > > the
> > > > >> > > > > >> > > > > >> WindowAssigner
> > > > >> > > > > >> > > > > >> > >> to
> > > > >> > > > > >> > > > > >> > >> >> set
> > > > >> > > > > >> > > > > >> > >> >> >> > that information in the
> > > > EvictorContext.
> > > > >> > > > > >> > > > > >> > >> >> >> > What do you think?.
> > > > >> > > > > >> > > > > >> > >> >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> > Thanks and Regards,
> > > > >> > > > > >> > > > > >> > >> >> >> > Vishnu Viswanath,
> > > > >> > > > > >> > > > > >> > >> >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> > On Wed, Jul 13, 2016 at 10:09
> AM,
> > > > >> Aljoscha
> > > > >> > > > > >> Krettek <
> > > > >> > > > > >> > > > > >> > >> >> aljoscha@apache.org
> > > > >> > > > > >> > > > > >> > >> >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> > wrote:
> > > > >> > > > > >> > > > > >> > >> >> >> >
> > > > >> > > > > >> > > > > >> > >> >> >> > > Hi,
> > > > >> > > > > >> > > > > >> > >> >> >> > > I think the way to go here is
> > to
> > > > add
> > > > >> > both
> > > > >> > > an
> > > > >> > > > > >> > > > > >> EventTimeEvictor
> > > > >> > > > > >> > > > > >> > >> and a
> > > > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeEvictor. The
> > > problem
> > > > is
> > > > >> > that
> > > > >> > > > > >> > > > "isEventTime"
> > > > >> > > > > >> > > > > >> > cannot
> > > > >> > > > > >> > > > > >> > >> >> >> really be
> > > > >> > > > > >> > > > > >> > >> >> >> > > determined. That's also the
> > > reason
> > > > >> why
> > > > >> > > there
> > > > >> > > > > is
> > > > >> > > > > >> an
> > > > >> > > > > >> > > > > >> > >> EventTimeTrigger
> > > > >> > > > > >> > > > > >> > >> >> >> and a
> > > > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeTrigger. It was
> > > just
> > > > an
> > > > >> > > > > oversight
> > > > >> > > > > >> > that
> > > > >> > > > > >> > > > the
> > > > >> > > > > >> > > > > >> > >> >> TimeEvictor
> > > > >> > > > > >> > > > > >> > >> >> >> does
> > > > >> > > > > >> > > > > >> > >> >> >> > > not also have these two
> > versions.
> > > > >> > > > > >> > > > > >> > >> >> >> > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > About
> EvictingWindowOperator, I
> > > > think
> > > > >> > you
> > > > >> > > > can
> > > > >> > > > > >> make
> > > > >> > > > > >> > > the
> > > > >> > > > > >> > > > > two
> > > > >> > > > > >> > > > > >> > >> methods
> > > > >> > > > > >> > > > > >> > >> >> >> > > non-final in WindowOperator,
> > yes.
> > > > >> > > > > >> > > > > >> > >> >> >> > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > Cheers,
> > > > >> > > > > >> > > > > >> > >> >> >> > > Aljoscha
> > > > >> > > > > >> > > > > >> > >> >> >> > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > On Wed, 13 Jul 2016 at 14:32
> > > Vishnu
> > > > >> > > > Viswanath
> > > > >> > > > > <
> > > > >> > > > > >> > > > > >> > >> >> >> > > vishnu.viswanath25@gmail.com
> >
> > > > >> > > > > >> > > > > >> > >> >> >> > > wrote:
> > > > >> > > > > >> > > > > >> > >> >> >> > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > Hi Aljoscha,
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > I am thinking of adding a
> > > method
> > > > >> > boolean
> > > > >> > > > > >> > > > isEventTime();
> > > > >> > > > > >> > > > > >> in
> > > > >> > > > > >> > > > > >> > the
> > > > >> > > > > >> > > > > >> > >> >> >> > > > EvictorContext apart from
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > long
> > > getCurrentProcessingTime();
> > > > >> > > > > >> > > > > >> > >> >> >> > > > MetricGroup
> getMetricGroup();
> > > > >> > > > > >> > > > > >> > >> >> >> > > > long getCurrentWatermark();
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > This method can be used to
> > make
> > > > the
> > > > >> > > > Evictor
> > > > >> > > > > >> not
> > > > >> > > > > >> > > > iterate
> > > > >> > > > > >> > > > > >> > >> through
> > > > >> > > > > >> > > > > >> > >> >> all
> > > > >> > > > > >> > > > > >> > >> >> >> the
> > > > >> > > > > >> > > > > >> > >> >> >> > > > elements in TimeEvictor.
> > There
> > > > will
> > > > >> > be a
> > > > >> > > > few
> > > > >> > > > > >> > > changes
> > > > >> > > > > >> > > > in
> > > > >> > > > > >> > > > > >> the
> > > > >> > > > > >> > > > > >> > >> >> existing
> > > > >> > > > > >> > > > > >> > >> >> >> > > > behavior of TimeEvictor and
> > > > >> > DeltaEvictor
> > > > >> > > > (I
> > > > >> > > > > >> have
> > > > >> > > > > >> > > > > >> mentioned
> > > > >> > > > > >> > > > > >> > >> this
> > > > >> > > > > >> > > > > >> > >> >> in
> > > > >> > > > > >> > > > > >> > >> >> >> the
> > > > >> > > > > >> > > > > >> > >> >> >> > > > design doc)
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > Also, is there any specific
> > > > reason
> > > > >> why
> > > > >> > > the
> > > > >> > > > > >> open
> > > > >> > > > > >> > and
> > > > >> > > > > >> > > > > close
> > > > >> > > > > >> > > > > >> > >> method
> > > > >> > > > > >> > > > > >> > >> >> in
> > > > >> > > > > >> > > > > >> > >> >> >> > > > WindowEvictor is made
> final?
> > > > Since
> > > > >> the
> > > > >> > > > > >> > > EvictorContext
> > > > >> > > > > >> > > > > >> will
> > > > >> > > > > >> > > > > >> > be
> > > > >> > > > > >> > > > > >> > >> in
> > > > >> > > > > >> > > > > >> > >> >> the
> > > > >> > > > > >> > > > > >> > >> >> >> > > > EvictingWindowOperator, I
> > need
> > > to
> > > > >> > > override
> > > > >> > > > > the
> > > > >> > > > > >> > open
> > > > >> > > > > >> > > > and
> > > > >> > > > > >> > > > > >> > close
> > > > >> > > > > >> > > > > >> > >> in
> > > > >> > > > > >> > > > > >> > >> >> >> > > > EvitingWindowOperator to
> make
> > > the
> > > > >> > > > reference
> > > > >> > > > > of
> > > > >> > > > > >> > > > > >> > EvictorContext
> > > > >> > > > > >> > > > > >> > >> >> null.
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > Thanks and Regards,
> > > > >> > > > > >> > > > > >> > >> >> >> > > > Vishnu Viswanath,
> > > > >> > > > > >> > > > > >> > >> >> >> > > >
> > > > >> > > > > >> > > > > >> > >> >> >> > > > On Fri, Ju

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message