flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Very strange behaviour of groupBy() -> sort() -> first()
Date Wed, 21 Jan 2015 19:38:36 GMT
Chesnay is right.
Right now, it is not possible to do want you want in a straightforward way
because Flink does not support to fully sort a data set (there are several
related issues in JIRA).

A workaround would be to attach a constant value to each tuple, group on
that (all tuples are sent to the same group), sort that group, and apply
the first operator.

2015-01-21 20:22 GMT+01:00 Chesnay Schepler <chesnay.schepler@fu-berlin.de>:

> If i remember correctly first() returns the first n values for every
> group. the javadocs actually don't make this behaviour very clear.
>
>
> On 21.01.2015 19:18, Felix Neutatz wrote:
>
>> Hi,
>>
>> my use case is the following:
>>
>> I have a Tuple2<String,Long>. I want to group by the String and sum up the
>> Long values accordingly. This works fine with these lines:
>>
>> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1);
>>
>> After the aggregation I want to print the 10 groups with the highest sum,
>> like:
>>
>> string1, 100L
>> string2, 50L
>> string3, 1L
>>
>> I tried that:
>>
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>>
>> But instead of 3 records, I get a lot more.
>>
>> Can see my error?
>>
>> Best regards,
>>
>> Felix
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message