pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1720826 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java optimizer/MultiQueryOptimizerSpark.java plan/SparkOperator.java
Date Fri, 18 Dec 2015 17:03:16 GMT
Author: xuefu
Date: Fri Dec 18 17:03:16 2015
New Revision: 1720826

URL: http://svn.apache.org/viewvc?rev=1720826&view=rev
Log:
PIG-4675: Operators with multiple predecessors fail under multiquery optimization (Liyun via
Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri Dec 18 17:03:16 2015
@@ -569,7 +569,7 @@ public class SparkLauncher extends Launc
         addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
         List<SparkOperator> predecessors = sparkPlan
                 .getPredecessors(sparkOperator);
-        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>();
         if (predecessors != null) {
             for (SparkOperator pred : predecessors) {
                 if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
@@ -577,7 +577,7 @@ public class SparkLauncher extends Launc
                             physicalOpRdds, convertMap, seenJobIDs, sparkStats,
                             conf);
                 }
-                predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+                predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
             }
         }
 
@@ -594,8 +594,8 @@ public class SparkLauncher extends Launc
         }
         for (PhysicalOperator leafPO : leafPOs) {
             try {
-                physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
-                        predecessorRDDs, convertMap);
+                physicalToRDD(sparkOperator, sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+                        predecessorOfPreviousSparkOp, convertMap);
                 sparkOpRdds.put(sparkOperator.getOperatorKey(),
                         physicalOpRdds.get(leafPO.getOperatorKey()));
             } catch (Exception e) {
@@ -626,34 +626,38 @@ public class SparkLauncher extends Launc
         }
     }
 
-    private void physicalToRDD(PhysicalPlan plan,
+    private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
                                PhysicalOperator physicalOperator,
                                Map<OperatorKey, RDD<Tuple>> rdds,
-                               List<RDD<Tuple>> rddsFromPredeSparkOper,
+                               Set<OperatorKey> predsFromPreviousSparkOper,
                                Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap)
             throws IOException {
         RDD<Tuple> nextRDD = null;
-        List<PhysicalOperator> predecessors = plan
+        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
                 .getPredecessors(physicalOperator);
-        if (predecessors != null && predecessors.size() > 1) {
-            Collections.sort(predecessors);
+        if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size()
> 1) {
+            Collections.sort(predecessorsOfCurrentPhysicalOp);
         }
 
-        List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
-        if (predecessors != null) {
-            for (PhysicalOperator predecessor : predecessors) {
-                physicalToRDD(plan, predecessor, rdds, rddsFromPredeSparkOper,
+        Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+        addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
+        if (predecessorsOfCurrentPhysicalOp != null) {
+            for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
+                physicalToRDD(sparkOperator, plan, predecessor, rdds, predsFromPreviousSparkOper,
                         convertMap);
-                predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+                operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
             }
 
         } else {
-            if (rddsFromPredeSparkOper != null
-                    && rddsFromPredeSparkOper.size() > 0) {
-                predecessorRdds.addAll(rddsFromPredeSparkOper);
+            if (predsFromPreviousSparkOper != null
+                    && predsFromPreviousSparkOper.size() > 0) {
+                for (OperatorKey predFromPreviousSparkOper : predsFromPreviousSparkOper)
{
+                    operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
+                }
             }
         }
 
+
         if (physicalOperator instanceof POSplit) {
             List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
             for (PhysicalPlan successPlan : successorPlans) {
@@ -663,7 +667,7 @@ public class SparkLauncher extends Launc
                     break;
                 }
                 PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
-                physicalToRDD(successPlan, leafOfSuccessPlan, rdds, predecessorRdds, convertMap);
+                physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan, rdds, operatorKeysOfAllPreds,
convertMap);
             }
         } else {
             RDDConverter converter = convertMap.get(physicalOperator.getClass());
@@ -675,7 +679,8 @@ public class SparkLauncher extends Launc
             LOG.info("Converting operator "
                     + physicalOperator.getClass().getSimpleName() + " "
                     + physicalOperator);
-            nextRDD = converter.convert(predecessorRdds, physicalOperator);
+            List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds,
rdds);
+            nextRDD = converter.convert(allPredRDDs, physicalOperator);
 
             if (nextRDD == null) {
                 throw new IllegalArgumentException(
@@ -687,6 +692,30 @@ public class SparkLauncher extends Launc
         }
     }
 
+    //get all rdds of predecessors sorted by the OperatorKey
+    private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds,
Map<OperatorKey, RDD<Tuple>> rdds) {
+        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+        Collections.sort(operatorKeyOfAllPreds);
+        for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+            predecessorRDDs.add(rdds.get(operatorKeyOfAllPred));
+        }
+        return predecessorRDDs;
+    }
+
+    //deal special cases containing operators with multiple predecessors when multiquery
is enabled to get the predecessors of specified
+    // physicalOp in previous SparkOp(see PIG-4675)
+    private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, PhysicalOperator
physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors) {
+        // the relationship is stored in sparkOperator.getMultiQueryOptimizeConnectionItem()
+        List<OperatorKey> predOperatorKeys = sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
+        if (predOperatorKeys != null) {
+            for (OperatorKey predOperator : predOperatorKeys) {
+                LOG.debug(String.format("add predecessor(OperatorKey:%s) for OperatorKey:%s",
predOperator, physicalOperator.getOperatorKey()));
+                operatorKeysOfPredecessors.add(predOperator);
+            }
+        }
+    }
+
     @Override
     public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
                         String format, boolean verbose) throws IOException {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
Fri Dec 18 17:03:16 2015
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -38,6 +40,9 @@ import org.apache.pig.impl.plan.VisitorE
  * MultiQueryOptimizer for spark
  */
 public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
+
+    private static final Log LOG = LogFactory.getLog(MultiQueryOptimizerSpark.class);
+
     private String scope;
     private NodeIdGenerator nig;
 
@@ -104,18 +109,27 @@ public class MultiQueryOptimizerSpark ex
                 POStore poStore = null;
                 if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) {
                     poStore = (POStore) firstNodeLeaf;
+                    PhysicalOperator predOfPoStore = sparkOp.physicalPlan.getPredecessors(poStore).get(0);
                     sparkOp.physicalPlan.remove(poStore); // remove  unnecessary store
-                    POSplit split = getSplit();
+                    POSplit poSplit = createSplit();
                     ArrayList<SparkOperator> spliteesCopy = new ArrayList
                             <SparkOperator>(splittees);
                     for (SparkOperator splitee : spliteesCopy) {
-                        List<PhysicalOperator> firstNodeRoots = splitee.physicalPlan.getRoots();
-                        for (int i = 0; i < firstNodeRoots.size(); i++) {
-                            if (firstNodeRoots.get(i) instanceof POLoad) {
-                                POLoad poLoad = (POLoad) firstNodeRoots.get(i);
+                        List<PhysicalOperator> rootsOfSplitee = splitee.physicalPlan.getRoots();
+                        for (int i = 0; i < rootsOfSplitee.size(); i++) {
+                            if (rootsOfSplitee.get(i) instanceof POLoad) {
+                                POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
                                 if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName()))
{
+                                    List<PhysicalOperator> successorsOfPoLoad = splitee.physicalPlan.getSuccessors(poLoad);
+                                    List<PhysicalOperator> successorofPoLoadsCopy =
new ArrayList<PhysicalOperator>(successorsOfPoLoad);
                                     splitee.physicalPlan.remove(poLoad);  // remove  unnecessary
load
-                                    split.addPlan(splitee.physicalPlan);
+                                    for (PhysicalOperator successorOfPoLoad : successorofPoLoadsCopy)
{
+                                        //we store from to relationship in SparkOperator#multiQueryOptimizeConnectionMap
+                                        sparkOp.addMultiQueryOptimizeConnectionItem(successorOfPoLoad.getOperatorKey(),
predOfPoStore.getOperatorKey());
+                                        LOG.debug(String.format("add multiQueryOptimize connection
item: to:%s, from:%s for %s",
+                                                successorOfPoLoad.toString(), predOfPoStore.getOperatorKey().toString(),
splitee.getOperatorKey()));
+                                    }
+                                    poSplit.addPlan(splitee.physicalPlan);
                                     addSubPlanPropertiesToParent(sparkOp, splitee);
                                     removeSplittee(getPlan(), sparkOp, splitee);
                                 }
@@ -123,7 +137,7 @@ public class MultiQueryOptimizerSpark ex
                         }
                     }
 
-                    sparkOp.physicalPlan.addAsLeaf(split);
+                    sparkOp.physicalPlan.addAsLeaf(poSplit);
                 }
             }
         } catch (PlanException e) {
@@ -145,7 +159,7 @@ public class MultiQueryOptimizerSpark ex
         getPlan().remove(splittee);
     }
 
-    private POSplit getSplit() {
+    private POSplit createSplit() {
         return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Fri Dec 18 17:03:16 2015
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 
 /**
  * An operator model for a Spark job. Acts as a host to the plans that will
@@ -81,12 +82,14 @@ public class SparkOperator extends Opera
 
 	private List<String> crossKeys = null;
 
-	public SparkOperator(OperatorKey k) {
-		super(k);
-		physicalPlan = new PhysicalPlan();
-		UDFs = new HashSet<String>();
-		scalars = new HashSet<PhysicalOperator>();
-	}
+    private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap = new
MultiMap<OperatorKey, OperatorKey>();
+
+    public SparkOperator(OperatorKey k) {
+        super(k);
+        physicalPlan = new PhysicalPlan();
+        UDFs = new HashSet<String>();
+        scalars = new HashSet<PhysicalOperator>();
+    }
 
 	@Override
 	public boolean supportsMultipleInputs() {
@@ -264,4 +267,14 @@ public class SparkOperator extends Opera
     public void setRequestedParallelismByReference(SparkOperator oper) {
         this.requestedParallelism = oper.requestedParallelism;
     }
+
+    //If enable multiquery optimizer, in some cases, the predecessor(from) of a physicalOp(to)
will be the leaf physicalOperator of
+    //previous sparkOperator.More detail see PIG-4675
+    public void addMultiQueryOptimizeConnectionItem(OperatorKey to, OperatorKey from) {
+        multiQueryOptimizeConnectionMap.put(to, from);
+    }
+
+    public MultiMap<OperatorKey, OperatorKey> getMultiQueryOptimizeConnectionItem()
{
+        return multiQueryOptimizeConnectionMap;
+    }
 }



Mime
View raw message