flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Multiple keys in reduceGroup ?
Date Thu, 22 Oct 2015 10:36:13 GMT
If not, could you provide us with the program and test data to reproduce
the error?

Cheers,
Till

On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
> > On 22 Oct 2015, at 12:31, Stephan Ewen <sewen@apache.org> wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2<InputRecord, Reference> ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction<InputRecord, OutputRecord>() {
> >
> > @Override
> >
> >        public void reduce(Iterable< Tuple2<InputRecord, Reference>>
> values,  Collector<OutputRecord> out) throws Exception {
> >
> >              // Issue : all values do not share the same key
> >
> >       final List<Tuple2<InputRecord, Reference>> listValues = new
> ArrayList<Tuple2<InputRecord, Reference>>();
> >
> >              for (final Tuple2<InputRecord, Reference>value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >        for (int i = 1; i < listValues.size(); i++) {
> >
> >             if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >                       throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> >                     è This exception is fired !
> >
> >            }
> >
> >         }
> >
> > }
> >
> > }) ;
> >
> >
> >
> > I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
> >
> >
> >
> > The sortGroup is not the cause of the problem, as I do get the same
> error without it.
> >
> >
> >
> > Have I misunderstood the grouping concept or is it really an awful bug?
> >
> >
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
> >
>
>

Mime
View raw message