flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yiannis Gkoufas <johngou...@gmail.com>
Subject Re: Best strategy for calculating percentage
Date Sat, 21 Feb 2015 02:13:31 GMT
Hi Fabian,

thanks a lot for the clarification, will give it a shot and will let you
know how it goes!

BR

On 20 February 2015 at 22:18, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Yiannis,
>
> I think your program is not working correctly. The problem is with the
> sum() aggregation function.
> Right now, Flink's aggregations function update values in place. That
> means, that all non-key and non-aggregations fields have undeterministic
> values.
> For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and
> <0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>.
>
> I would solve your task as follows:
>
> input.map(e => e._2.toString.split(","))
>   .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we
> duplicate the value later
>   .groupBy(0,1)
>   .sum(2)
>   .groupBy(1)
>   .groupReduce(new CustomGroupReduceFunction())
>
> The CustomGroupReduceFunction remembers all fields 0 and 2 and computes
> the sum of field 2. Once all values have been read, you compute for each
> remembered field 2 the percentage to the sum of all fields 2 and emit a new
> tuple.
>
> This does only work, if the number of records in one group is not too
> large. Otherwise you might face OutOfMemoryExceptions.
> If the optimizer is clever enough, the data is only sorted once.
>
> Let me know, if you have any questions.
>
> Cheers, Fabian
>
>
> 2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <johngouf85@gmail.com>:
>
>> Hi there,
>>
>> I have the following scenario:
>> My files have 2 attributes and 1 numeric value:
>> (attr1,attr2,val)
>> I want to generate the percentage of values of each of attr1 on the sum
>> of val grouped on attr2
>> Currently I am doing it like this:
>>
>> input.map(e => e._2.toString.split(","))
>>   .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble,e(3).toDouble))
>>   .groupBy(0,1)
>>   .sum(2)
>>   .groupBy(1)
>>   .sum(3)
>>   .map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString()))
>>
>> Is there a more efficient way to calculate this?
>>
>> Thanks a lot!
>>
>>
>>
>

Mime
View raw message