Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD69B10475 for ; Fri, 20 Feb 2015 22:20:52 +0000 (UTC) Received: (qmail 59764 invoked by uid 500); 20 Feb 2015 22:19:54 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 59722 invoked by uid 500); 20 Feb 2015 22:19:54 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 59712 invoked by uid 99); 20 Feb 2015 22:19:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 22:19:53 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fhueske@gmail.com designates 209.85.213.46 as permitted sender) Received: from [209.85.213.46] (HELO mail-yh0-f46.google.com) (209.85.213.46) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 22:19:27 +0000 Received: by yhoc41 with SMTP id c41so5096246yho.2 for ; Fri, 20 Feb 2015 14:18:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=H9h/bFFKkfzTy2L9juWAhS8BBZjuykFtLFvKy6gdq24=; b=pv4FlkCXneTSVfZXHa5OL5tMkORJrS+rH1ust5Zx3yGxXGPg7WQyPY94O0ggBJ6DOU 4fPaxB7DCkw0T9P7LEW99Sx+ZCcWRkFPZ4V7DhO30AyThheGJobELs5JL5m7ejiY3mm3 U1m6PafyTiH+n8X/adzs0k4TAHXKN2VgFXq/r70LjBVO6c1uNRTmfysoVH26r0sO9amT q1zcKh3NVHbM+M27+KrZUw8Y+RBQve2OeU8dlGfZhVlm/EceYHb2a3dnF9iaemuf4IhW f73MtEmGCJlZiCjzbJNF2elbUwRiClZywphDCDJpcD4/WVzXw7a+6U0/hOGJj8vmoy/A LuKg== MIME-Version: 1.0 X-Received: by 10.170.152.85 with SMTP id t82mr9751488ykc.111.1424470721005; Fri, 20 Feb 2015 14:18:41 -0800 (PST) Received: by 10.170.131.139 with HTTP; Fri, 20 Feb 2015 14:18:40 -0800 (PST) In-Reply-To: References: Date: Fri, 20 Feb 2015 23:18:40 +0100 Message-ID: Subject: Re: Best strategy for calculating percentage From: Fabian Hueske To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113b3488519072050f8c6f50 X-Virus-Checked: Checked by ClamAV on apache.org --001a113b3488519072050f8c6f50 Content-Type: text/plain; charset=UTF-8 Hi Yiannis, I think your program is not working correctly. The problem is with the sum() aggregation function. Right now, Flink's aggregations function update values in place. That means, that all non-key and non-aggregations fields have undeterministic values. For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and <0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>. I would solve your task as follows: input.map(e => e._2.toString.split(",")) .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we duplicate the value later .groupBy(0,1) .sum(2) .groupBy(1) .groupReduce(new CustomGroupReduceFunction()) The CustomGroupReduceFunction remembers all fields 0 and 2 and computes the sum of field 2. Once all values have been read, you compute for each remembered field 2 the percentage to the sum of all fields 2 and emit a new tuple. This does only work, if the number of records in one group is not too large. Otherwise you might face OutOfMemoryExceptions. If the optimizer is clever enough, the data is only sorted once. Let me know, if you have any questions. Cheers, Fabian 2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas : > Hi there, > > I have the following scenario: > My files have 2 attributes and 1 numeric value: > (attr1,attr2,val) > I want to generate the percentage of values of each of attr1 on the sum of > val grouped on attr2 > Currently I am doing it like this: > > input.map(e => e._2.toString.split(",")) > .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble,e(3).toDouble)) > .groupBy(0,1) > .sum(2) > .groupBy(1) > .sum(3) > .map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString())) > > Is there a more efficient way to calculate this? > > Thanks a lot! > > > --001a113b3488519072050f8c6f50 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Yiannis,

I think your program is not= working correctly. The problem is with the sum() aggregation function.
Right now, Flink's aggregations function update values in place.= That means, that all non-key and non-aggregations fields have undeterminis= tic values.
For example doing a groupBy(0,1).sum(2) on two Tuples= <0,1,2,3> and <0,1,4,5> could either result in <0,1,6,3>= or <0,1,6,5>.

I would solve your task as fo= llows:

input.map(e =3D> e._2.toString.split(",&= quot;))
=C2=A0 .map(e=3D> (e(0),Utils.getMonthFromDate(e(1).toLong),e= (3).toDouble)) // we duplicate the value later
=C2=A0 .groupBy(0,1)
= =C2=A0 .sum(2)
=C2=A0 .groupBy(1)
=C2=A0 .groupReduce(new Cus= tomGroupReduceFunction())

The CustomGroupReduceFunction r= emembers all fields 0 and 2 and computes the sum of field 2. Once all value= s have been read, you compute for each remembered field 2 the percentage to= the sum of all fields 2 and emit a new tuple.

Thi= s does only work, if the number of records in one group is not too large. O= therwise you might face OutOfMemoryExceptions.
If the optimizer i= s clever enough, the data is only sorted once.=C2=A0

Let me know, if you have any questions.

Cheers,= Fabian


2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <= johngouf85@gmail.= com>:
Hi t= here,

I have the following scenario:
My files = have 2 attributes and 1 numeric value:
(attr1,attr2,val)
I want to generate the percentage of values of each of attr1 on the sum o= f val grouped on attr2
Currently I am doing it like this:

input.map(e =3D> e._2.toString.split(","))
.= map(e=3D> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3= ).toDouble,e(3).toDouble))
.= groupBy(0,1)
.sum(2= )
.groupBy(1)
.sum(3)
.map(e =3D> (e._1,e._2,sca= la.math.BigDecimal(e._3*1.0/e._4*1.0).toString()))
Is there a more efficient way to calculate thi=
s?
Thanks a lot!

<= /div>

--001a113b3488519072050f8c6f50--