Hi Fabian,

thanks a lot for the clarification, will give it a shot and will let you know how it goes!

BR

On 20 February 2015 at 22:18, Fabian Hueske <fhueske@gmail.com> wrote:
Hi Yiannis,

I think your program is not working correctly. The problem is with the sum() aggregation function.
Right now, Flink's aggregations function update values in place. That means, that all non-key and non-aggregations fields have undeterministic values.
For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and <0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>.

I would solve your task as follows:

input.map(e => e._2.toString.split(","))
  .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we duplicate the value later
  .groupBy(0,1)
  .sum(2)
  .groupBy(1)
  .groupReduce(new CustomGroupReduceFunction())

The CustomGroupReduceFunction remembers all fields 0 and 2 and computes the sum of field 2. Once all values have been read, you compute for each remembered field 2 the percentage to the sum of all fields 2 and emit a new tuple.

This does only work, if the number of records in one group is not too large. Otherwise you might face OutOfMemoryExceptions.
If the optimizer is clever enough, the data is only sorted once. 

Let me know, if you have any questions.

Cheers, Fabian


2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <johngouf85@gmail.com>:
Hi there,

I have the following scenario:
My files have 2 attributes and 1 numeric value:
(attr1,attr2,val)
I want to generate the percentage of values of each of attr1 on the sum of val grouped on attr2
Currently I am doing it like this:

input.map(e => e._2.toString.split(","))
.map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble,e(3).toDouble))
.groupBy(0,1)
.sum(2)
.groupBy(1)
.sum(3)
.map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString()))
Is there a more efficient way to calculate this?
Thanks a lot!