flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Subject Re: Behavior of SlidingProessingTimeWindow with CountTrigger
Date Mon, 14 Mar 2016 17:21:45 GMT
Hi Aljoscha,

Thank you for the explanation and the link on IBM infosphere. That explains
whey I am seeing (a,3) and (b,3) in my example.

Yes, the name Evictor is confusing.

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Mon, Mar 14, 2016 at 11:24 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

Hi,
> sure, the evictors are a bit confusing (especially the fact that they are
> called evictors). They should more correctly called “Keepers”. The process
> is the following:
>
> 1. Trigger Fires
> 2. Evictor decides what elements to keep, so a CountEvictor.of(3) says,
> keep only three elements, all others are evicted
> 3. Elements that remain after evictor are used for processing
>
> We mostly have Evictors for legacy reasons nowadays since the original
> window implementation was based on ideas in IBM InfoSphere streams. See
> this part of their documentation for some explanation:
> https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html
>
> - aljoscha
> > On 14 Mar 2016, at 17:04, Vishnu Viswanath <vishnu.viswanath25@gmail.com>
> wrote:
> >
> > Hi Aijoscha,
> >
> > Wow, great illustration.
> >
> > That was very clear explanation. Yes, I did enter the elements fast for
> case b and I was seeing more of case As.
> > Also, sometimes I have seen a window getting triggered when I enter 1 or
> 2 elements, I believe that is expansion of case A, w.r.t to window 2.
> >
> > Also can you explain me the case when using Evictor.
> > e.g.,
> >
> >
> > val counts = socTextStream.flatMap{_.split("\\s")}
> >   .map { (_, 1) }
> >   .keyBy(0)
> >
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >   .trigger(CountTrigger.of(5))
> >   .evictor(CountEvictor.of(3))
> >   .sum(1).setParallelism(4);
> >
> > counts.print()
> > sev.execute()
> >
> > for the input
> >
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > I got the output as
> >
> >
> > 1> (a,3)
> >
> > 1> (b,3)
> >
> > 2> (b,3)
> >
> > My assumption was that, when the Trigger is triggered, the processing
> will be done on the entire items in the window,
> >
> > and then 3 items will be evicted from the window, which can also be part
> of the next processing of that window. But
> >
> > here it looks like  the sum is calculated only on the items that were
> evicted from the window.
> >
> > Could you please explain what is going on here.
> >
> >
> >
> > Thanks and Regards,
> > Vishnu Viswanath,
> > www.vishnuviswanath.com
> >
> > On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> > Hi,
> > I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> > <window example.png>
> > The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
> >
> > I hope this helps.
> >
> > Cheers,
> > Aljoscha
> >> On 12 Mar 2016, at 19:19, Vishnu Viswanath <
> vishnu.viswanath25@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >>
> >> I have the below code
> >>
> >>
> >> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> >> val socTextStream = sev.socketTextStream("localhost",4444)
> >>
> >> val counts = socTextStream.flatMap{_.split("\\s")}
> >>   .map { (_, 1) }
> >>   .keyBy(0)
> >>
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >>   .trigger(CountTrigger.of(5))
> >>   .sum(1)
> >>
> >> counts.print()
> >> sev.execute()
> >>
> >> I am sending messages to the port 4444 using nc -lk 4444
> >> This is my sample input
> >>
> >> a
> >> a
> >> a
> >> a
> >> a
> >> b
> >> b
> >> b
> >> b
> >> b
> >> c
> >> c
> >> c
> >> c
> >> c
> >> d
> >> d
> >> d
> >> d
> >> d
> >> e
> >> e
> >> e
> >> e
> >> e
> >>
> >> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> >> Output:
> >>
> >> 1> (a,5)
> >> 1> (a,5)
> >> 1> (b,5)
> >> 2> (c,5)
> >> 2> (c,5)
> >> 1> (d,5)
> >> 1> (e,5)
> >> 1> (e,5)
> >>
> >> As you can see, for some character the count is printed twice(a,c,e)
> and for some characters it is printed only once (b,d). I am not able to
> figure out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> >> Can someone explain me what is going on?
> >>
> >>
> >> Thanks and Regards,
> >> Vishnu Viswanath
> >> www.vishnuviswanath.com
> >>
> >
> >
>
​

Mime
View raw message