hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pradeep Kamath (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-845) PERFORMANCE: Merge Join
Date Mon, 10 Aug 2009 23:39:14 GMT

    [ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12741621#action_12741621
] 

Pradeep Kamath commented on PIG-845:
------------------------------------

Review comments:
1) In LogicalPlanTester.java, why is the following change required?
{noformat}
@@ -198,7 +198,7 @@
     private LogicalPlan buildPlan(String query, ClassLoader cldr) {
 
         LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
-        PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
         try {
             pigContext.connect();
         } catch (ExecException e1) {
{noformat}

Typically when PigContext is constructed in Map-reduce mode, the properties should correspond
to the cluster configuration. So the above initialization seems odd because the Properties
object is an empty object in the constructor call above.

2) In PigMapBase.java:

public static final String END_OF_INP_IN_MAP = "pig.stream.in.map";

can change to

public static final String END_OF_INP_IN_MAP = "pig.blocking.operator.in.map"; and this should
be put as a public static member of JobControlCompiler.

In JobControlCompiler.java,

jobConf.set("pig.stream.in.map", "true");  should change to use the above public static String.


3) Remove the following comment in QueryParser.jjt (line 302):
{code}
    * Join parser. Currently can only handle skewed joins.        
{code}

4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a LinkedMultiMap
but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. If order is
important, shouldn't QueryParser.jjt also change?

5) Some comments in LogToPhyTranslationVisitor about the different lists and maps would help
:)

6) In validateMergeJoin() - the code only considers direct successors and predecessors of
LOJoin. It should check the entire plan and ensure that predecessors of LOJoin all the way
to the LOLoad are only LOForEach and LOFilter. Strictly we should not allow LOForeach since
it could change sort order or position of join keys and hence invalidate the index - but we
need it
so that the Foreach introduced by the TypeCastInserter when there is a schema for either of
the inputs remains. You should note in the documentation that only Order and join key position
preserving Foreachs and Filters are allowed as predecessors to merge join and check the same
in validateMergeJoin() - it is better to use a whitelist of allowed operators than a blacklist
of disallowed once (since then the blacklist would need to be updated anytime a new operator
comes along. The exception source here is not really a bug but a user input error since merge
join really doesnot support other ops.

Again for the successor, all successors from mergejoin down to map leaf should be checked
to ensure stream is absent (really there should be no restriction on stream being present
after the join - if there is an issue currently with this, it is fine to not allow stream
but eventually it would be good to not have any restriction on what follows the merge join).
You can just use a visitor to check presence of stream in the plan - this should be done after
complete LogToPhyTranslation is done - in visit() so that the whole plan can be looked at.

7) Is MRStreamHandler.java now replaced by /org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
?

8) Some of MRCompilerExceptions do not follow the Error handling spec - errcode, errMsg, Src

9) Should assert() statements in MRCompiler be replaced with Exceptions since assertions are
disabled by default in Java.

10) In MRCompiler.java I wonder if you should change
{code}
                    rightMapPlan.disconnect(rightLoader, loadSucc);
                    rightMapPlan.remove(loadSucc);
{code}
to
{code}
	            rightMapPlan.trimBelow(rightLoader);
{code}
We really want to remove all operators in rightMapPlan other than the loader.

11) We should note in documentation that merge join only works for data sorted in ascending
order. (the MRCompiler code assumes this - we should have sort check if possible - see performance
comment below)

12) It would be good to add a couple of unit tests with a few operators after merge join to
ensure merge join operators well with successors in the plan.

13) In POMergeJoin.java, comments about foreach should be cleaned up since foreach is no longer
used. For example:
{code}
//variable which denotes whether we are returning tuples from the foreach operator
{code}

The following code can be factored out into a function since its repeated twice:
{code}
               case POStatus.STATUS_EOP:          // Current file has ended. Need to open
next file by reading next index entry.
                    String prevFile = rightLoader.getLFile().getFileName();
                    while(true){                        // But next file may be same as previous
one, because index may contain multiple entries for same file.
                        Tuple idxEntry = index.poll();
                        if(null == idxEntry)          // Index is finished too. Right stream
is finished. No more tuples.
                            return res;
                        else{
                            if(prevFile.equals((String)idxEntry.get(idxEntry.size()-2)))
                                continue;
                            else{
                                initRightLoader(idxEntry);
                                res = rightLoader.getNext(dummyTuple);
                                rightPipelineRoot.attachInput((Tuple)res.result);
                                return this.getNextRightInp();
                            }   
                        }   
                    }   

{code}

A couple of things to try and check impact on performance:
1) Introduce checks for sortedness of inputs to merge join
2) Increase sample size from 1 per map to say 10 per map

> PERFORMANCE: Merge Join
> -----------------------
>
>                 Key: PIG-845
>                 URL: https://issues.apache.org/jira/browse/PIG-845
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Olga Natkovich
>            Assignee: Ashutosh Chauhan
>         Attachments: merge-join-1.patch, merge-join-for-review.patch
>
>
> Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message