@Fabian, I think there's a typo in your code, shouldn't it be
dataset // assuming some partitioning that can be reused to avoid a shuffle
.sortPartition(1, Order.DESCENDING)
.mapPartition(new ReturnFirstTen())
.sortPartition(1, Order.DESCENDING)
.mapPartition(new ReturnFirstTen()).parallelism(1)
i.e. the second MapPartition has to be parallelism=1
On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <fhueske@gmail.com> wrote:
> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deepcopies if object reuse is
> enabled [1]).
> Best, Fabian
> [1]
> https://ci.apache.org/projects/flink/flinkdocsrelease1.1/apis/batch/index.html#operatingondataobjectsinfunctions
> 20170124 11:49 GMT+01:00 Gábor Gévay <ggab90@gmail.com>:
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
> 20170124 11:35 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> > .sortPartition(1, Order.DESCENDING)
> > .mapPartition(new ReturnFirstTen())
> > .sortPartition(1, Order.DESCENDING).parallelism(1)
> > .mapPartition(new ReturnFirstTen())
> > Best, Fabian
> >
> > 20170124 10:10 GMT+01:00 Ivan Mushketyk <ivan.mushketik@gmail.com>:
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner<Double>() {
> >> @Override
> >> public int partition(Double key, int numPartitions) {
> >> return key.intValue() % numPartitions;
> >> }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
