flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Best strategy for calculating percentage
Date Fri, 20 Feb 2015 22:18:40 GMT
Hi Yiannis,

I think your program is not working correctly. The problem is with the
sum() aggregation function.
Right now, Flink's aggregations function update values in place. That
means, that all non-key and non-aggregations fields have undeterministic
For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and
<0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>.

I would solve your task as follows:

input.map(e => e._2.toString.split(","))
  .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we
duplicate the value later
  .groupReduce(new CustomGroupReduceFunction())

The CustomGroupReduceFunction remembers all fields 0 and 2 and computes the
sum of field 2. Once all values have been read, you compute for each
remembered field 2 the percentage to the sum of all fields 2 and emit a new

This does only work, if the number of records in one group is not too
large. Otherwise you might face OutOfMemoryExceptions.
If the optimizer is clever enough, the data is only sorted once.

Let me know, if you have any questions.

Cheers, Fabian

2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <johngouf85@gmail.com>:

> Hi there,
> I have the following scenario:
> My files have 2 attributes and 1 numeric value:
> (attr1,attr2,val)
> I want to generate the percentage of values of each of attr1 on the sum of
> val grouped on attr2
> Currently I am doing it like this:
> input.map(e => e._2.toString.split(","))
>   .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble,e(3).toDouble))
>   .groupBy(0,1)
>   .sum(2)
>   .groupBy(1)
>   .sum(3)
>   .map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString()))
> Is there a more efficient way to calculate this?
> Thanks a lot!

View raw message