flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Case of possible join optimization
Date Tue, 08 Sep 2015 09:10:20 GMT
Obviously when trying to simplify my code I didn't substitute correctly the
variable of the join..it should be:

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset
=
      attrToExpand.join(*subset*
).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

Do you think that a JoinHint to create a sort-merge join is equivalent to
my solution?

On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <sewen@apache.org> wrote:

> Hi Flavio!
>
> No, Flink does not join keys before full values. That is very often very
> inefficient, as it results effectively in two joins where one is typically
> about as expensive as the original join.
>
> One can do "semi-join-reduction", in case the join filters out many values
> (many elements from one side do not find a match in the other side). If the
> join does not filter, this does not help either.
>
> Your code is a bit of a surprise. Especially, because in you solution that
> worked, the first statement does nothing:
>
> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>
> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>
>
> This builds a join, but then takes the second input of the join (the
> bigDataset data set). Because the result of the join is never
> actually used, it is never executed. The second statement hence is
> effectively
>
> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset
=
>
> attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>
>
> Curious why this executed when the original did not.
>
> BTW: If the Lists are very long so they do not fit into a hashtable memory
> partition, you can try to use a JoinHint to create a sort-merge join. It
> may become slower, but typically works with even less memory.
>
>
> Greetings,
> Stephan
>
>
> On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Hi to all,
>>
>> I have a case where I don't understand why flink is not able to optimize
>> the join between 2 datasets.
>>
>> My initial code was basically this:
>>
>> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207
>> elements
>> DataSet<Tuple2<String,List<MyObject>>> attrToExpand =
>> ...;//65.000 elements
>>
>> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp =
>>
>> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>
>> This job wasn't able to complete on my local machine (from Eclipse)
>> because Flink was giving me the following error:
>>
>> Hash join exceeded maximum number of recursions, without reducing
>> partitions enough to be memory resident. Probably cause: Too many duplicate
>> keys.
>>
>> This was because in attrToExpand the List<MyObject> could be quite big.
>> Indeed, changing that code to the following make everything work like a
>> charm:
>>
>> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>>
>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>>
>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>>
atomSubset =
>>
>> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>
>>
>> Isn't something impossible for Flink to optimize my initial code into the
>> second? I was convinced that Flink was performing a join only on the keys
>> before grabbing also the other elements of the Tuples into memory..am I
>> wrong?
>>
>> Best,
>> Flavio
>>
>
>

Mime
View raw message