flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: max aggregator dosen't work as expected
Date Mon, 08 Aug 2016 13:52:02 GMT
I have to admit that the difference between the two methods is subtle, and
in my opinion it doesn't make much sense to have the two variants.

- max() returns a tuple with the max value at the specified position, the
other fields of the tuple/pojo are undefined
- maxBy() returns a tuple with the max value at the specified position (the
other fields are retained)

On Mon, Aug 8, 2016 at 2:55 PM, Claudia Wegmann <c.wegmann@kasasi.de> wrote:

> OK, found my mistake reagarding question 2.). I key by the id value and
> gave all the data sets different values there. So of course all 4 data sets
> are printed. Sorry J
>
> But question 1.) still remains.
>
>
>
> *Von:* Claudia Wegmann [mailto:c.wegmann@kasasi.de]
> *Gesendet:* Montag, 8. August 2016 14:27
> *An:* user@flink.apache.org
> *Betreff:* max aggregator dosen't work as expected
>
>
>
> Hey,
>
>
>
> I have some questions to aggregate functions such as max or min. Take the
> following example:
>
>
>
> //create Stream with event time where Data contains an ID, a timestamp and
> a temperature value
>
> DataStream<Data> oneStream = env.fromElements(
>
>         *new *Data(123, *new *Date(116, 8,8,11,11,11), 5),
>
>         *new *Data(124, *new *Date(116, 8,8,12,10,11), 8),
>
>         *new *Data(125, *new *Date(116, 8,8,12,12,11), 10),
>
>         *new *Data(126, *new *Date(116, 8,8,12,15,11), 2))
>
> .assignTimestampsAndWatermarks(*new *AscendingTimestampExtractor<Data>() {
>     @Override
>     *public long *extractAscendingTimestamp(*final *Data data) {
>         *return *data.getTimestamp().getTime();
>     }
> });
>
>
>
> //calluclate max value of temperature per hour
>
> DataStream<Data> maxStream = WatermarkStream
>
>         .keyBy(*"id"*)
>
>         .timeWindow(Time.*hours*(1))
>
>         .max(*"temp"*);
> maxStream.print();
>
>
>
>
>
> Here are the questions I ran into:
>
> 1.)    Why does the resulting stream “maxStream” have to be of type Data?
> From the documentation and the difference to maxBy I would expect the
> resulting stream to be of type Integer?
>
> 2.)    Executing the code as it is above, I would expect the printed
> result be the data with ID 124 and ID 125. However, the execution prints
> all 4 data sets. Did I totally get this example wrong? How would the code
> need  to look, to get the expected result?
>
> 3.)    Can you point me to a good example for using time windows and
> aggregates? I couldn’t find one that explains the above questions.
>
>
>
> Thanks for your help and best wishes,
>
> Claudia
>
>
>
>
>
>
>

Mime
View raw message