flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Naveen Tirupattur <ntirupat...@maprtech.com>
Subject How to merge messages from all partitions
Date Sat, 10 Dec 2016 02:46:00 GMT
HI,

I am trying to group messages by message name, timestamp and then perform aggregation on message
value. My window function looks like below

metrics.keyBy("metricName")
    .keyBy("timeStamp")
    .timeWindow(Time.seconds(30))
    .trigger(ProcessingTimeTrigger.create())
    .fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric, Tuple3<String,Long,Double>>()
{

      @Override
      public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double> arg0,
Metric arg1) throws Exception {
        double count = 0.0;
        long timeStamp = arg1.getTimeStamp();
        String metricName = arg1.getMetricName();
        count =+ arg1.getValue();
        return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
      }
    }).print();

My intention is to aggregate all the messages of a particular type that occurred at a timestamp
t across all partitions to calculate running mean. I see that some aggregation is happening
but for each partition the intermediate values are being printed. My question is how do I
get one aggregated value across all partitions? Kindly help.

P.S I am getting messages from Kafka with 10 partitions.

Thanks,
Naveen
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message