flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Behaviour of CountWindowAll
Date Mon, 14 Dec 2015 09:47:10 GMT
Hi Nirmalya,
when using count windows the window will trigger after “slide-size” elements have been
received. So, since in your example, slide-size is set to 1 it will emit a new max for every
element received and once it accumulated 4 elements it will start removing one element for
every new element that arrives before computing the max.

Cheers,
Aljoscha
> On 14 Dec 2015, at 02:55, Nirmalya Sengupta <sengupta.nirmalya@gmail.com> wrote:
> 
> Hello Fabian (and others),
> 
> Sorry to bring up the same flogged topic of CountWindowAll() but I just want to be sure
that I understand it right.
> 
> For a dataset like the following (partial):
> 
> -----------------------------------------
> 
> probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
> probe-dccefede,199,749.25,78.6057,1448028160,27.46
> probe-f29f9662,199,821.81,81.7831,1448028160,22.35
> probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
> probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
> probe-4d78b545,204,778.42,78.412,1448028160,25.92
> probe-400c5cdf,204,711.65,73.585,1448028160,27.18
> ...........
> -----------------------------------------
> 
> The following code :
> 
> -----------------------------------------
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     env.setParallelism(1)
> 
>     val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .countWindowAll(4,1)
>       .maxBy(1)
> 
> 
>     readings.print
> -------------------------------------------
> 
> produces this (partial):
> 
> ------------------------------------------
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-400c5cdf,27.18)
> ......
> ------------------------------------------
> 
> I am trying to justify the first three lines of the output. When I call CountWindowAll(4,1),
don't I instruct Flink that 'wait till you get at least first 4 readings before you calculate
the maximum'? It appears that Flink is calculating max() for every incoming tuple it is adding
to the window. What is the correct and complete interpretation of the computation then?
> 
> -- N
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they
should be.
> Now put the foundation under them."


Mime
View raw message