pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1737174 - /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Date Thu, 31 Mar 2016 02:50:34 GMT
Author: xuefu
Date: Thu Mar 31 02:50:34 2016
New Revision: 1737174

URL: http://svn.apache.org/viewvc?rev=1737174&view=rev
Log:
PIG-4848: pig.noSplitCombination=true should always be set internally for a merge join (Xianda
via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1737174&r1=1737173&r2=1737174&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Thu Mar 31 02:50:34 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -92,6 +93,12 @@ public class LoadConverter implements RD
         jobConf.set("mapreduce.input.fileinputformat.inputdir",
                 op.getLFile().getFileName());
 
+        // internally set pig.noSplitCombination as true ONLY for
+        // the POLoad operator which has POMergeJoin successor.
+        if (hasMergeJoinSuccessor(op)) {
+            jobConf.set("pig.noSplitCombination", "true");
+        }
+
         RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
                 jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
 
@@ -218,4 +225,20 @@ public class LoadConverter implements RD
         return jobConf;
     }
 
+    private static boolean hasMergeJoinSuccessor(PhysicalOperator op) {
+        List<PhysicalOperator> successors = op.getParentPlan().getSuccessors(op);
+        if (successors == null ) {
+            return false;
+        }
+        for (PhysicalOperator successor : successors){
+            if (successor instanceof POMergeJoin){
+                return true;
+            }
+            if (hasMergeJoinSuccessor(successor)){
+                return true;
+            }
+        }
+        return false;
+    }
+
 }



Mime
View raw message