flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claudia Wegmann <c.wegm...@kasasi.de>
Subject max aggregator dosen't work as expected
Date Mon, 08 Aug 2016 12:26:52 GMT
Hey,

I have some questions to aggregate functions such as max or min. Take the following example:

//create Stream with event time where Data contains an ID, a timestamp and a temperature value

DataStream<Data> oneStream = env.fromElements(

        new Data(123, new Date(116, 8,8,11,11,11), 5),

        new Data(124, new Date(116, 8,8,12,10,11), 8),

        new Data(125, new Date(116, 8,8,12,12,11), 10),

        new Data(126, new Date(116, 8,8,12,15,11), 2))

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Data>() {
    @Override
    public long extractAscendingTimestamp(final Data data) {
        return data.getTimestamp().getTime();
    }
});



//calluclate max value of temperature per hour

DataStream<Data> maxStream = WatermarkStream

        .keyBy("id")

        .timeWindow(Time.hours(1))

        .max("temp");
maxStream.print();




Here are the questions I ran into:

1.)    Why does the resulting stream "maxStream" have to be of type Data? From the documentation
and the difference to maxBy I would expect the resulting stream to be of type Integer?


2.)    Executing the code as it is above, I would expect the printed result be the data with
ID 124 and ID 125. However, the execution prints all 4 data sets. Did I totally get this example
wrong? How would the code need  to look, to get the expected result?


3.)    Can you point me to a good example for using time windows and aggregates? I couldn't
find one that explains the above questions.

Thanks for your help and best wishes,
Claudia



Mime
View raw message