Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6CFFD17E12 for ; Mon, 16 Mar 2015 07:59:20 +0000 (UTC) Received: (qmail 9056 invoked by uid 500); 16 Mar 2015 07:59:20 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 8980 invoked by uid 500); 16 Mar 2015 07:59:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 8970 invoked by uid 99); 16 Mar 2015 07:59:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 07:59:20 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of ewenstephan@gmail.com designates 209.85.223.171 as permitted sender) Received: from [209.85.223.171] (HELO mail-ie0-f171.google.com) (209.85.223.171) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 07:59:14 +0000 Received: by iegc3 with SMTP id c3so167158887ieg.3 for ; Mon, 16 Mar 2015 00:58:54 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=O/+Ofk1ZEtmzRm7giWkNwldMB7BZyMelogj6QA0vsmI=; b=09YPAxSAl3RpSLuE2dUSxoYGIumsreitR4p4uCU01N1pBEUrie5YdOQGwcIJoupolg KFfWEYnrKnkUZme8UZet6k2bC34ugbR6/ZI8G+7IEEOppZOHd/L8j+LQO5+53qKA/cN+ uHNbzDTXXJ8pUq65hAxxTVwTa/d6/yQVC8cnT8Fw09LmRqyrYlfuCnpqCJLd0sVoR79V yhNi5xgZDzq13U3MZgz+NUFgrxEtEdeBDB/rCKeK/K78s+1Z8BXyrZ0WSrj0l2ayBTSU BZQ2e9wjvZbYh7ShrNyzvlKweYiiVjeKsinVCneECTkizt59JCe6LFRWCi6KG3FJFI59 k52Q== MIME-Version: 1.0 X-Received: by 10.107.132.158 with SMTP id o30mr71117931ioi.9.1426492734138; Mon, 16 Mar 2015 00:58:54 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.64.76.130 with HTTP; Mon, 16 Mar 2015 00:58:53 -0700 (PDT) Received: by 10.64.76.130 with HTTP; Mon, 16 Mar 2015 00:58:53 -0700 (PDT) In-Reply-To: References: Date: Mon, 16 Mar 2015 08:58:53 +0100 X-Google-Sender-Auth: Az9UwgybkGEfZfAhLfAERP9Bepw Message-ID: Subject: Re: Sort tuple dataset From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113f29bcb172f005116338a7 X-Virus-Checked: Checked by ClamAV on apache.org --001a113f29bcb172f005116338a7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I think that depends on your use case. If you want to work on the entire dataset as a whole anyways, you can assign a Dummy-Key (like 0) to all elements, group by that key and sort the group on the actual value. What exactly is you use case? Does the above solution work there? Am 15.03.2015 17:39 schrieb "Kristoffer Sj=C3=B6gren" : > After building flink 0.9-SNAPSHOT from source and using > DataSet.sortPartition is indeed working as expected. > > This is fine but raises the question on how to go about sorting in 0.8.1? > > > > > > On Sun, Mar 15, 2015 at 5:05 PM, Kristoffer Sj=C3=B6gren > wrote: > >> That's the thing, there is no DataSet.sortPartition method in 0.8.1. >> Looking through the git history show that sortPartition was added 20th o= f >> February so I think that's 0.9-SNAPSHOT? >> >> >> On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen wrote: >> >>> Hi! >>> >>> I think sort partition is the right think, if you have only one >>> partition (which makes sense, if you want a total order). It is not a >>> parallel operation any mode, so use it only after the data size has bee= n >>> reduced (filters / aggregations). >>> >>> What about "data.sortPartition().setParallelism(1)". >>> >>> Does that work for you? >>> >>> Greetings, >>> Stephan >>> >>> >>> On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sj=C3=B6gren >>> wrote: >>> >>>> Thanks for your answer. I guess i'm a bit infected by writing to much >>>> Crunch code and I also suspected that getDataSet() was the wrong thing= to >>>> do :-) >>>> >>>> However I was expecting DataSet.sortPartition to do the sorting, but >>>> this method is missing in 0.8.1? >>>> >>>> Do you have a minimal example? I was looking through the tests but mos= t >>>> of them use sortPartition. >>>> >>>> Cheers, >>>> -Kristoffer >>>> >>>> >>>> >>>> On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen wrote= : >>>> >>>>> Hi Kristoffer! >>>>> >>>>> There are a few issues with that code: >>>>> >>>>> 1) Grouping and then calling "sort group" sorts within the group. In >>>>> your case, you group after the entire element and each group has on v= alue - >>>>> the element. Sorting inside the group does not make any difference. T= here >>>>> is no order across groups. >>>>> >>>>> 2) This code never groups and sorts. The calls to "groupBy(0).sortGro= up(0, >>>>> Order.DESCENDING)." do not group and sort already, they set up a grou= ping >>>>> to be used with a reduce or aggregate function. The "getDataSet()" ca= ll >>>>> gets you the original data set, which is the original input. >>>>> >>>>> To see an illustration of this, get the program plan >>>>> (env.getExecutionPlan()). You can render it using the html file >>>>> "tools/planVisualizer.html". >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sj=C3=B6gren >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> This is silly but I can't understand why the following code doesn't >>>>>> sort the collection of integers. It seems to be reasonable thing to = do from >>>>>> an API perspective? >>>>>> >>>>>> Cheers, >>>>>> -Kristoffer >>>>>> >>>>>> final ExecutionEnvironment env =3D >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new >>>>>> MapFunction>() { >>>>>> @Override >>>>>> public Tuple1 map(Integer value) throws Exception { >>>>>> return new Tuple1(value); >>>>>> } >>>>>> }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print(= ); >>>>>> env.execute(); >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > --001a113f29bcb172f005116338a7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

I think that depends on your use case. If you want to work o= n the entire dataset as a whole anyways, you can assign a Dummy-Key (like 0= ) to all elements, group by that key and sort the group on the actual value= .

What exactly is you use case? Does the above solution work t= here?

Am 15.03.2015 17:39 schrieb "Kristoffer Sj= =C3=B6gren" <stoffe@gmail.com>:
After building flink=C2=A00.9-SNAPSHOT=C2=A0from source and using Data= Set.sortPartition is indeed working as expected.

This is= fine but raises the question on how to go about sorting in 0.8.1?
=




On Sun, Mar 15, 2015 at 5= :05 PM, Kristoffer Sj=C3=B6gren <stoffe@gmail.com> wrote:
=
That's the thing, there= is no DataSet.sortPartition method in 0.8.1. Looking through the git histo= ry show that sortPartition was added 20th of February so I think that's= 0.9-SNAPSHOT?

On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen <= span dir=3D"ltr"><= sewen@apache.org> wrote:
Hi!

I think sort partition is the right t= hink, if you have only one partition (which makes sense, if you want a tota= l order). It is not a parallel operation any mode, so use it only after the= data size has been reduced (filters / aggregations).=C2=A0

<= /div>
What about "data.sortPartition().setParallelism(1)".

Does that work for you?

Gre= etings,
Stephan


On Sun, Mar 15, 2015 at 4:47 P= M, Kristoffer Sj=C3=B6gren <stoffe@gmail.com> wrote:
Thanks for your answer. I guess = i'm a bit infected by writing to much Crunch code and I also suspected = that getDataSet() was the wrong thing to do :-)=C2=A0

Ho= wever I was expecting DataSet.sortPartition to do the sorting, but this met= hod is missing in 0.8.1?

Do you have a minimal exa= mple? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <sewen@apach= e.org> wrote:
Hi Kristoffer!

There are a few issues with that code= :

1) Grouping and then calling "sort group&qu= ot; sorts within the group. In your case, you group after the entire elemen= t and each group has on value - the element. Sorting inside the group does = not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)= ." do not group and sort already, they set up a grouping to be used wi= th a reduce or aggregate function. The "getDataSet()" call gets y= ou the original data set, which is the original input.

To see an illustration of this, get the p= rogram plan (env.getExecutionPlan()). You can render it using the html file= "tools/planVisualizer.html".

Greetings,
Stephan

On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sj= =C3=B6gren <stoffe@gmail.com> wrote:
Hi

This is silly but I can= 9;t understand why the following code doesn't sort the collection of in= tegers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer
final ExecutionEnvironment env =3D ExecutionEnvironment.ge= tExecutionEnvironment();
=C2=A0 =C2=A0 env.fromCollection(Lists.n= ewArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer= >>() {
=C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2= =A0 =C2=A0 public Tuple1<Integer> map(Integer value) throws Exception= {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return new Tuple1(value);
=C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }).groupBy(0).sortGroup(0,= Order.DESCENDING).getDataSet().print();
=C2=A0 =C2=A0 env.execut= e();







--001a113f29bcb172f005116338a7--