hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ashutosh Chauhan (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-845) PERFORMANCE: Merge Join
Date Wed, 12 Aug 2009 06:07:14 GMT

    [ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12742231#action_12742231

Ashutosh Chauhan commented on PIG-845:

Hi Pradeep,

Thanks for the review. Please find my comments inline.

1) In LogicalPlanTester.java, why is the following change required?
Typically when PigContext is constructed in Map-reduce mode, the properties should correspond
to the cluster configuration. So the above initialization seems odd because the Properties
object is an empty object in the constructor call above.

>> This is required because in local mode merge join gets rewritten as a regular join.
So, if we had exec type as local, the plan which I get in MRCompiler corresponds to regular
join plan against which we cant test merge join plan. Properties object has no bearing here,
because LogicalPlanTester is used only for testing logical plans. Further I think all our
tests should have exec type as MapReduce because we want to test the correctness in MapReduce

2) In PigMapBase.java:
public static final String END_OF_INP_IN_MAP = "pig.stream.in.map";
can change to
public static final String END_OF_INP_IN_MAP = "pig.blocking.operator.in.map"; and this should
be put as a public static member of JobControlCompiler.
In JobControlCompiler.java,
jobConf.set("pig.stream.in.map", "true"); should change to use the above public static String.
>> Will update this in new patch.

3) Remove the following comment in QueryParser.jjt (line 302):
* Join parser. Currently can only handle skewed joins.
>> Will be removed in next patch.

4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a LinkedMultiMap
but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. If order is
important, shouldn't QueryParser.jjt also change?
>> Good catch. Order is indeed important. Will fix this in next patch.

5) Some comments in LogToPhyTranslationVisitor about the different lists and maps would help
>> those lists and maps were there earlier also, I didnt introduce anything new. I just
moved them around :) But I agree that section needs to be documented better. Also took me
a while to get my head around it. Will include comment about purpose of each in next patch.

6) In validateMergeJoin() - the code only considers direct successors and predecessors of
LOJoin. It should check the entire plan and ensure that predecessors of LOJoin all the way
to the LOLoad are only LOForEach and LOFilter. Strictly we should not allow LOForeach since
it could change sort order or position of join keys and hence invalidate the index - but we
need it
so that the Foreach introduced by the TypeCastInserter when there is a schema for either of
the inputs remains. You should note in the documentation that only Order and join key position
preserving Foreachs and Filters are allowed as predecessors to merge join and check the same
in validateMergeJoin() - it is better to use a whitelist of allowed operators than a blacklist
of disallowed once (since then the blacklist would need to be updated anytime a new operator
comes along. The exception source here is not really a bug but a user input error since merge
join really doesnot support other ops.

Again for the successor, all successors from mergejoin down to map leaf should be checked
to ensure stream is absent (really there should be no restriction on stream being present
after the join - if there is an issue currently with this, it is fine to not allow stream
but eventually it would be good to not have any restriction on what follows the merge join).
You can just use a visitor to check presence of stream in the plan - this should be done after
complete LogToPhyTranslation is done - in visit() so that the whole plan can be looked at.

>> Agreed. I fixed the bug for Streaming. Now there is no restriction for what follows
Merge Join. For predecessors, I included new function which walks all the way up to make sure
operators preceding merge join are the only the ones among the whitelist of LOLoad or LOForEach
or LOFilter.
7) Is MRStreamHandler.java now replaced by /org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
>> Yes.

8) Some of MRCompilerExceptions do not follow the Error handling spec - errcode, errMsg, Src
>> Will update them.

9) Should assert() statements in MRCompiler be replaced with Exceptions since assertions are
disabled by default in Java.
>> Will update them.

10) In MRCompiler.java I wonder if you should change
rightMapPlan.disconnect(rightLoader, loadSucc);
We really want to remove all operators in rightMapPlan other than the loader.
>> Didn't know about this function. This indeed is the one which is needed here.

11) We should note in documentation that merge join only works for data sorted in ascending
order. (the MRCompiler code assumes this - we should have sort check if possible - see performance
comment below)
>> Will include in comments.

12) It would be good to add a couple of unit tests with a few operators after merge join to
ensure merge join operators well with successors in the plan.
>> there was one already with load - load -join -join-union-filter. Will include one
more which introduces MR boundary after merge -join.
load -load -join-group-filter.

13) In POMergeJoin.java, comments about foreach should be cleaned up since foreach is no longer
>> Will update it.

The following code can be factored out into a function since its repeated twice:
>> If you see closely, its not exact repetition thus can't be factored out.

A couple of things to try and check impact on performance:
1) Introduce checks for sortedness of inputs to merge join
>> I introduced these checks and benchmarked and there was no noticeable difference
in CPU times, so I am including them. Now POMergeJoin checks for data sortedness and fails
if it finds data isn't sorted. 

2) Increase sample size from 1 per map to say 10 per map
>> This is a classic case of dense vs sparse index trade-offs. Dense index will be beneficial
when there are lots of distinct keys but takes longer to build. On the other hand if there
are lots of rows corresponding to same key this wouldn't buy us much, infact may hurt as we
will spend more time in index construction time. Moreover, a better index essentially may
help us to cut down on the read times of right side. In my experimentation I found read times
are negligible compared to actually producing joined tuples and writing them out to DFS (
in order of tens of seconds for task lasting couple of hours). So this needs to be thought
bit more carefully and benchmarked. For now I am sampling one tuple per map block.

> -----------------------
>                 Key: PIG-845
>                 URL: https://issues.apache.org/jira/browse/PIG-845
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Olga Natkovich
>            Assignee: Ashutosh Chauhan
>         Attachments: merge-join-1.patch, merge-join-for-review.patch
> Thsi join would work if the data for both tables is sorted on the join key.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message