Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CD77B200C04 for ; Tue, 24 Jan 2017 17:53:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CC09C160B3E; Tue, 24 Jan 2017 16:53:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C9DA5160B38 for ; Tue, 24 Jan 2017 17:53:05 +0100 (CET) Received: (qmail 58369 invoked by uid 500); 24 Jan 2017 16:53:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58360 invoked by uid 99); 24 Jan 2017 16:53:04 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 16:53:04 +0000 Received: from mail-io0-f179.google.com (mail-io0-f179.google.com [209.85.223.179]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 8C24C1A018B for ; Tue, 24 Jan 2017 16:53:04 +0000 (UTC) Received: by mail-io0-f179.google.com with SMTP id j18so139830228ioe.2 for ; Tue, 24 Jan 2017 08:53:04 -0800 (PST) X-Gm-Message-State: AIkVDXJ/CRQjdioecRRa8/uEbZwdGetRwsUVH7q3gtldhhCB1bQt3UWB92Gvx1KdNEho4fFqv9e8lSePMEbqAQ== X-Received: by 10.107.137.33 with SMTP id l33mr28456675iod.215.1485276783771; Tue, 24 Jan 2017 08:53:03 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Tue, 24 Jan 2017 16:52:53 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: How to get top N elements in a DataSet? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113ebfa41716a40546d9f388 archived-at: Tue, 24 Jan 2017 16:53:07 -0000 --001a113ebfa41716a40546d9f388 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable @Fabian, I think there's a typo in your code, shouldn't it be dataset // assuming some partitioning that can be reused to avoid a shuffle .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen()) .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen()).parallelism(1) i.e. the second MapPartition has to be parallelism=3D1 On Tue, 24 Jan 2017 at 11:57 Fabian Hueske wrote: > You are of course right Gabor. > @Ivan, you can use a heap in the MapPartitionFunction to collect the top > 10 elements (note that you need to create deep-copies if object reuse is > enabled [1]). > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/in= dex.html#operating-on-data-objects-in-functions > > > 2017-01-24 11:49 GMT+01:00 G=C3=A1bor G=C3=A9vay : > > Hello, > > Btw. there is a Jira about this: > https://issues.apache.org/jira/browse/FLINK-2549 > Note that the discussion there suggests a more efficient approach, > which doesn't involve sorting the entire partitions. > > And if I remember correctly, this question comes up from time to time > on the mailing list. > > Best, > G=C3=A1bor > > > > 2017-01-24 11:35 GMT+01:00 Fabian Hueske : > > Hi Ivan, > > > > I think you can use MapPartition for that. > > So basically: > > > > dataset // assuming some partitioning that can be reused to avoid a > shuffle > > .sortPartition(1, Order.DESCENDING) > > .mapPartition(new ReturnFirstTen()) > > .sortPartition(1, Order.DESCENDING).parallelism(1) > > .mapPartition(new ReturnFirstTen()) > > > > Best, Fabian > > > > > > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk : > >> > >> Hi, > >> > >> I have a dataset of tuples with two fields ids and ratings and I need = to > >> find 10 elements with the highest rating in this dataset. I found a > >> solution, but I think it's suboptimal and I think there should be a > better > >> way to do it. > >> > >> The best thing that I came up with is to partition dataset by rating, > sort > >> locally and write the partitioned dataset to disk: > >> > >> dataset > >> .partitionCustom(new Partitioner() { > >> @Override > >> public int partition(Double key, int numPartitions) { > >> return key.intValue() % numPartitions; > >> } > >> }, 1) . // partition by rating > >> .setParallelism(5) > >> .sortPartition(1, Order.DESCENDING) // locally sort by rating > >> .writeAsText("..."); // write the partitioned dataset to disk > >> > >> This will store tuples in sorted files with names 5, 4, 3, ... that > >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read > sorted > >> data from disk and and N elements with the highest rating. > >> Is there a way to do the same but without writing a partitioned datase= t > to > >> a disk? > >> > >> I tried to use "first(10)" but it seems to give top 10 items from a > random > >> partition. Is there a way to get top N elements from every partition? > Then I > >> could locally sort top values from every partition and find top 10 > global > >> values. > >> > >> Best regards, > >> Ivan. > >> > >> > > > > > --001a113ebfa41716a40546d9f388 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
@Fabian, I think there's a typo in your code, shouldn&= #39;t it be

dataset // assuming some partitioning that c= an be reused to avoid a shuffle
=C2=A0 .sortPartition(1, Order.DESCENDING)
=C2=A0 .mapPartition(new ReturnFirstTen())
<= /div>=C2=A0 .sortPartition(1, Order.DESCENDING)
=C2=A0 .mapPartition(new ReturnFirstTen()).parallelism(1)
<= br>
i.e. the second MapPartition has to be parallelism=3D1
<= div>

On Tue,= 24 Jan 2017 at 11:57 Fabian Hueske <fhueske@gmail.com> wrote:
=
You are of co= urse right Gabor.
@I= van, you can use a heap in the MapPartitionFunction to collect the top 10 e= lements (note that you need to create deep-copies if object reuse is enable= d [1]).

Be= st, Fabian
<= div class=3D"gmail_extra gmail_msg">
2017-01-24 11:49 GMT+01:00 G=C3=A1bor G=C3=A9vay <ggab90@gmail.com>:
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/b= rowse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
G=C3=A1bor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a sh= uffle
>=C2=A0 =C2=A0.sortPartition(1, Order.DESCENDING)
>=C2=A0 =C2=A0.mapPartition(new ReturnFirstTen())
>=C2=A0 =C2=A0.sortPartition(1, Order.DESCENDING).parallelism(1)
>=C2=A0 =C2=A0.mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <ivan.mushketik@gm= ail.com>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I n= eed to
>> find 10 elements with the highest rating in this dataset. I found = a
>> solution, but I think it's suboptimal and I think there should= be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rati= ng, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>=C2=A0 =C2=A0@Override
>>=C2=A0 =C2=A0public int partition(Double key, int numPartitions) {<= br class=3D"gmail_msg"> >>=C2=A0 =C2=A0 =C2=A0return key.intValue() % numPartitions;
>>=C2=A0 =C2=A0}
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to= disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... tha= t
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can re= ad sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned da= taset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 i= tems from a random
>> partition. Is there a way to get top N elements from every partiti= on? Then I
>> could locally sort top values from every partition and find top 10= global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>

--001a113ebfa41716a40546d9f388--