flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Case of possible join optimization
Date Tue, 08 Sep 2015 09:28:26 GMT
Ah..Fortunately it seems to do what I need :)
It efficiently filters the bigDataset retaining only the needed elements
making the join feasible with few memory.. :)
So that's a bug? Which should be the right way to achieve that behaviour
with Flink?

On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen <sewen@apache.org> wrote:

> The problem is the "getInput2()" call. It takes the input to the join, not
> the result of the join. That way, the first join never happens.
>
> On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Obviously when trying to simplify my code I didn't substitute correctly
>> the variable of the join..it should be:
>>
>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>>
atomSubset =
>>       attrToExpand.join(*subset*
>> ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>
>> Do you think that a JoinHint to create a sort-merge join is equivalent to
>> my solution?
>>
>>
>> On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi Flavio!
>>>
>>> No, Flink does not join keys before full values. That is very often very
>>> inefficient, as it results effectively in two joins where one is typically
>>> about as expensive as the original join.
>>>
>>> One can do "semi-join-reduction", in case the join filters out many
>>> values (many elements from one side do not find a match in the other side).
>>> If the join does not filter, this does not help either.
>>>
>>> Your code is a bit of a surprise. Especially, because in you solution
>>> that worked, the first statement does nothing:
>>>
>>> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>>>
>>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>>>
>>>
>>> This builds a join, but then takes the second input of the join (the
>>> bigDataset data set). Because the result of the join is never
>>> actually used, it is never executed. The second statement hence is
>>> effectively
>>>
>>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>>
atomSubset =
>>>
>>> attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>>
>>>
>>> Curious why this executed when the original did not.
>>>
>>> BTW: If the Lists are very long so they do not fit into a hashtable
>>> memory partition, you can try to use a JoinHint to create a sort-merge
>>> join. It may become slower, but typically works with even less memory.
>>>
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pompermaier@okkam.it
>>> > wrote:
>>>
>>>> Hi to all,
>>>>
>>>> I have a case where I don't understand why flink is not able to
>>>> optimize the join between 2 datasets.
>>>>
>>>> My initial code was basically this:
>>>>
>>>> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207
>>>> elements
>>>> DataSet<Tuple2<String,List<MyObject>>> attrToExpand =
>>>> ...;//65.000 elements
>>>>
>>>> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp =
>>>>
>>>> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>>>
>>>> This job wasn't able to complete on my local machine (from Eclipse)
>>>> because Flink was giving me the following error:
>>>>
>>>> Hash join exceeded maximum number of recursions, without reducing
>>>> partitions enough to be memory resident. Probably cause: Too many duplicate
>>>> keys.
>>>>
>>>> This was because in attrToExpand the List<MyObject> could be quite
big.
>>>> Indeed, changing that code to the following make everything work like a
>>>> charm:
>>>>
>>>> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>>>>
>>>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>>>>
>>>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>>
atomSubset =
>>>>
>>>> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>>>
>>>>
>>>> Isn't something impossible for Flink to optimize my initial code into
>>>> the second? I was convinced that Flink was performing a join only on the
>>>> keys before grabbing also the other elements of the Tuples into memory..am
>>>> I wrong?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message