flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michele Bertoni <michele1.bert...@mail.polimi.it>
Subject Re: sorting groups
Date Wed, 17 Jun 2015 13:34:05 GMT
Got it,
I solved the problem changing the grouping function:
instead of group by r._1 (int the topk case)
now I group using a function that return r._1

thus both, group and sort, have a function as parameter
is there a reason why it works in this way? probably you should make it very clear in the


Il giorno 17/giu/2015, alle ore 08:35, Michele Bertoni <michele1.bertoni@mail.polimi.it<mailto:michele1.bertoni@mail.polimi.it>>
ha scritto:

Hi Fabian,
My dataset is of this type
RegionType (Long, String, Long, Long, Char, Array[GValue])
Where GValue is a case class implemented by

I have two case of sorting:
In the first (topk) i have to group by the first field of the regions and sort by a set of
fields of the GValue array

In the second (topg) i have to sort by the first field of the regions and by a set of fields
of the array, then sort by one field of the array

For grouping i am using the groupby function with a function as parameter that creates the
hash of the desired fields, something like
ds.groupby((r:RegionType) =>
  s = new stringBuilder
  grouping.init.foreach((index:int) =>

Then i sort it using (in the topg case, the second)
  r._6(grouping.last ) /*here i am doing some cast, i am writing from my smartphone i don't
remember all the details sorry*/ ),Order.ASCENDING)

in the first case instead i group only by r._1 and i have a recursive function that appends
sortgroup operator to the grouoed dataset

Is there a way to solve this?

I think i don't understand what a keySelector is

Da: Fabian Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>
Inviato: martedì 16 giugno 2015 23.43.03
A: user@flink.apache.org<mailto:user@flink.apache.org>
Oggetto: Re: sorting groups


the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key selector function
for the sorting key if you also used a selector function for the grouping key.

In Scala the use of key selector functions is often not very obvious.

If you post the groupBy().sortGroup() command and the input type, I can help you getting it

Cheers, Fabian

2015-06-16 23:37 GMT+02:00 Michele Bertoni <michele1.bertoni@mail.polimi.it<mailto:michele1.bertoni@mail.polimi.it>>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on
KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


View raw message