flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Very strange behaviour of groupBy() -> sort() -> first()
Date Thu, 22 Jan 2015 10:11:34 GMT
BTW, I think as well that global sorting is an important feature and
definitely missing in Flink (FLINK-598).
Enabling local sorting for data sinks is one step on the way which can be
rather easily solved (FLINK-1105).

If you would like to contribute to make sorting possible, I would be very
happy to guide you ;-)

Cheers, Fabian

2015-01-21 22:33 GMT+01:00 Fabian Hueske <fhueske@apache.org>:

> This should directly go into the API, IMO.
> As I said, there are several open JIRAs for this issue.
>
> 2015-01-21 22:29 GMT+01:00 Felix Neutatz <neutatz@googlemail.com>:
>
>> Thanks, @Fabian, your workaround works :)
>>
>> But I think this feature is really missing. Shall we add this
>> functionality
>> natively or via the proposed lib package?
>>
>> 2015-01-21 20:38 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
>>
>> > Chesnay is right.
>> > Right now, it is not possible to do want you want in a straightforward
>> way
>> > because Flink does not support to fully sort a data set (there are
>> several
>> > related issues in JIRA).
>> >
>> > A workaround would be to attach a constant value to each tuple, group on
>> > that (all tuples are sent to the same group), sort that group, and apply
>> > the first operator.
>> >
>> > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de
>> > >:
>> >
>> > > If i remember correctly first() returns the first n values for every
>> > > group. the javadocs actually don't make this behaviour very clear.
>> > >
>> > >
>> > > On 21.01.2015 19:18, Felix Neutatz wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> my use case is the following:
>> > >>
>> > >> I have a Tuple2<String,Long>. I want to group by the String and
sum
>> up
>> > the
>> > >> Long values accordingly. This works fine with these lines:
>> > >>
>> > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
>> > >> lineitems.project(new int
>> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> > >> 1);
>> > >>
>> > >> After the aggregation I want to print the 10 groups with the highest
>> > sum,
>> > >> like:
>> > >>
>> > >> string1, 100L
>> > >> string2, 50L
>> > >> string3, 1L
>> > >>
>> > >> I tried that:
>> > >>
>> > >> lineitems.project(new int
>> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>> > >>
>> > >> But instead of 3 records, I get a lot more.
>> > >>
>> > >> Can see my error?
>> > >>
>> > >> Best regards,
>> > >>
>> > >> Felix
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message