flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Naveen Tirupattur <ntirupat...@maprtech.com>
Subject How to merge messages from all partitions
Date Sat, 10 Dec 2016 02:46:00 GMT

I am trying to group messages by message name, timestamp and then perform aggregation on message
value. My window function looks like below

    .fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric, Tuple3<String,Long,Double>>()

      public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double> arg0,
Metric arg1) throws Exception {
        double count = 0.0;
        long timeStamp = arg1.getTimeStamp();
        String metricName = arg1.getMetricName();
        count =+ arg1.getValue();
        return new Tuple3<String,Long,Double>(metricName,timeStamp,count);

My intention is to aggregate all the messages of a particular type that occurred at a timestamp
t across all partitions to calculate running mean. I see that some aggregation is happening
but for each partition the intermediate values are being printed. My question is how do I
get one aggregated value across all partitions? Kindly help.

P.S I am getting messages from Kafka with 10 partitions.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message