Hi Yiannis,I think your program is not working correctly. The problem is with the sum() aggregation function.Right now, Flink's aggregations function update values in place. That means, that all non-key and non-aggregations fields have undeterministic values.For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and <0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>.I would solve your task as follows:input.map(e => e._2.toString.split(","))
.map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we duplicate the value later
.sum(2).groupBy(1).groupReduce(new CustomGroupReduceFunction())The CustomGroupReduceFunction remembers all fields 0 and 2 and computes the sum of field 2. Once all values have been read, you compute for each remembered field 2 the percentage to the sum of all fields 2 and emit a new tuple.This does only work, if the number of records in one group is not too large. Otherwise you might face OutOfMemoryExceptions.If the optimizer is clever enough, the data is only sorted once.Let me know, if you have any questions.Cheers, Fabian2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <firstname.lastname@example.org>:Hi there,I have the following scenario:My files have 2 attributes and 1 numeric value:(attr1,attr2,val)I want to generate the percentage of values of each of attr1 on the sum of val grouped on attr2Currently I am doing it like this:input.map(e => e._2.toString.split(","))
.map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString()))Is there a more efficient way to calculate this?Thanks a lot!