flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Case of possible join optimization
Date Tue, 08 Sep 2015 08:45:12 GMT
Hi Flavio!

No, Flink does not join keys before full values. That is very often very
inefficient, as it results effectively in two joins where one is typically
about as expensive as the original join.

One can do "semi-join-reduction", in case the join filters out many values
(many elements from one side do not find a match in the other side). If the
join does not filter, this does not help either.

Your code is a bit of a surprise. Especially, because in you solution that
worked, the first statement does nothing:

DataSet<Tuple2<String, List<ThriftObj>>> subset =


This builds a join, but then takes the second input of the join (the
bigDataset data set). Because the result of the join is never
actually used, it is never executed. The second statement hence is

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset


Curious why this executed when the original did not.

BTW: If the Lists are very long so they do not fit into a hashtable memory
partition, you can try to use a JoinHint to create a sort-merge join. It
may become slower, but typically works with even less memory.


On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pompermaier@okkam.it>

> Hi to all,
> I have a case where I don't understand why flink is not able to optimize
> the join between 2 datasets.
> My initial code was basically this:
> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207
> elements
> DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000
> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp =
> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
> This job wasn't able to complete on my local machine (from Eclipse)
> because Flink was giving me the following error:
> Hash join exceeded maximum number of recursions, without reducing
> partitions enough to be memory resident. Probably cause: Too many duplicate
> keys.
> This was because in attrToExpand the List<MyObject> could be quite big.
> Indeed, changing that code to the following make everything work like a
> charm:
> DataSet<Tuple2<String, List<ThriftObj>>> subset =
> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset
> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
> Isn't something impossible for Flink to optimize my initial code into the
> second? I was convinced that Flink was performing a join only on the keys
> before grabbing also the other elements of the Tuples into memory..am I
> wrong?
> Best,
> Flavio

View raw message