flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Multiple keys in reduceGroup ?
Date Thu, 22 Oct 2015 10:31:14 GMT
Hi!

You are checking for equality / inequality with "!=" - can you check with
"equals()" ?

The key objects will most certainly be different in each record (as they
are deserialized individually), but they should be equal.

Stephan


On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> Trying to understand why my code was giving strange results, I’ve ended up
> adding “useless” controls in my code and came with what seems to me a bug.
> I group my dataset according to a key, but in the reduceGroup function I am
> passed values with different keys.
>
>
>
> My code has the following pattern (mix of java & pseudo-code in []) :
>
>
>
> inputDataSet *[of InputRecord]*
>
> .joinWithTiny(referencesDataSet *[of Reference]*)
>
> .where(*[InputRecord SecondaryKeySelector]*).equalTo(*[Reference
> KeySelector]*)
>
> .groupBy(*[PrimaryKeySelector : Tuple2<InputRecord, Reference> -> value.*
> *f0**.getPrimaryKey()]*)
>
> .sortGroup(*[DateKeySelector]*, Order.*ASCENDING*)
>
> .reduceGroup(*new* ReduceFunction<InputRecord, OutputRecord>() {
>
> @Override
>
>        *public* *void* reduce(Iterable< Tuple2<InputRecord, Reference>>
> values,  Collector<OutputRecord> out) *throws* Exception {
>
>              // Issue : all values do not share the same key
>
>       *final* List<Tuple2<InputRecord, Reference>> listValues = *new*
> ArrayList<Tuple2<InputRecord, Reference>>();
>
>              *for* (*final* Tuple2<InputRecord, Reference>value : values)
> { listValues.add(value); }
>
>
>
> *final* *long* primkey = listValues.get(0).f0.getPrimaryKey();
>
>        *for* (*int* i = 1; i < listValues.size(); i++) {
>
>             *if* (listValues.get(i).f0.getPrimaryKey() != primkey) {
>
>                       *throw* *new* IllegalStateException(primkey + " != "
> + listValues.get(i).f0.getPrimaryKey());
>
>                     è This exception is fired !
>
>            }
>
>         }
>
> }
>
> }) ;
>
>
>
> I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
>
>
>
> The sortGroup is not the cause of the problem, as I do get the same error
> without it.
>
>
>
> Have I misunderstood the grouping concept or is it really an awful bug?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Mime
View raw message