pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1728303 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/operator/ src/org/apache/pig/backend/hadoop/executionengine/spark/plan/ src/org/apache/pig/t...
Date Wed, 03 Feb 2016 12:50:40 GMT
Author: xuefu
Date: Wed Feb  3 12:50:40 2016
New Revision: 1728303

URL: http://svn.apache.org/viewvc?rev=1728303&view=rev
Log:
PIG-4783: Refactor SparkLauncher for spark engine (Liyun via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1728303&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Wed Feb  3 12:50:40 2016
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+
+import com.google.common.collect.Lists;
+
+public class JobGraphBuilder extends SparkOpPlanVisitor {
+
+    private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+
+    private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap =
null;
+    private SparkPigStats sparkStats = null;
+    private JavaSparkContext sparkContext = null;
+    private JobMetricsListener jobMetricsListener = null;
+    private String jobGroupID = null;
+    private Set<Integer> seenJobIDs = new HashSet<Integer>();
+    private SparkOperPlan sparkPlan = null;
+    private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap<OperatorKey,
RDD<Tuple>>();
+    private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap<OperatorKey,
RDD<Tuple>>();
+
+    public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap, SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener
jobMetricsListener, String jobGroupID) {
+        super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
+        this.sparkPlan = plan;
+        this.convertMap = convertMap;
+        this.sparkStats = sparkStats;
+        this.sparkContext = sparkContext;
+        this.jobMetricsListener = jobMetricsListener;
+        this.jobGroupID = jobGroupID;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        new PhyPlanSetter(sparkOp.physicalPlan).visit();
+        try {
+            sparkOperToRDD(sparkOp);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
+        } catch (JobCreationException e){
+            throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
+        }
+    }
+
+    private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException,
VisitorException, JobCreationException {
+        List<SparkOperator> predecessors = sparkPlan
+                .getPredecessors(sparkOperator);
+        Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>();
+        if (predecessors != null) {
+            for (SparkOperator pred : predecessors) {
+                predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
+            }
+        }
+
+        boolean isFail = false;
+        Exception exception = null;
+        if (sparkOperator instanceof NativeSparkOperator) {
+            ((NativeSparkOperator) sparkOperator).runJob();
+        } else {
+            List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
+
+            //One SparkOperator may have multiple leaves(POStores) after multiquery feature
is enabled
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size()
+ " leaves");
+            }
+            for (PhysicalOperator leafPO : leafPOs) {
+                try {
+                    physicalToRDD(sparkOperator, sparkOperator.physicalPlan, leafPO,
+                            predecessorOfPreviousSparkOp);
+                    sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                            physicalOpRdds.get(leafPO.getOperatorKey()));
+                } catch (Exception e) {
+                    LOG.error("throw exception in sparkOperToRDD: ", e);
+                    exception = e;
+                    isFail = true;
+                }
+            }
+
+
+            List<POStore> poStores = PlanHelper.getPhysicalOperators(
+                    sparkOperator.physicalPlan, POStore.class);
+            Collections.sort(poStores);
+            if (poStores.size() > 0) {
+                int i = 0;
+                if (!isFail) {
+                    List<Integer> jobIDs = getJobIDs(seenJobIDs);
+                    for (POStore poStore : poStores) {
+                        SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
+                                jobMetricsListener, sparkContext, sparkStats);
+                    }
+                } else {
+                    for (POStore poStore : poStores) {
+                        String failJobID = sparkOperator.name().concat("_fail");
+                        SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator,
sparkStats, exception);
+                    }
+                }
+            }
+        }
+    }
+
+    private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
+                               PhysicalOperator physicalOperator,
+                               Set<OperatorKey> predsFromPreviousSparkOper)
+            throws IOException {
+        RDD<Tuple> nextRDD = null;
+        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
+                .getPredecessors(physicalOperator);
+        if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size()
> 1) {
+            Collections.sort(predecessorsOfCurrentPhysicalOp);
+        }
+
+        Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+        addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
+        if (predecessorsOfCurrentPhysicalOp != null) {
+            for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
+                physicalToRDD(sparkOperator, plan, predecessor, predsFromPreviousSparkOper);
+                operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
+            }
+
+        } else {
+            if (predsFromPreviousSparkOper != null
+                    && predsFromPreviousSparkOper.size() > 0) {
+                for (OperatorKey predFromPreviousSparkOper : predsFromPreviousSparkOper)
{
+                    operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
+                }
+            }
+        }
+
+
+        if (physicalOperator instanceof POSplit) {
+            List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
+            for (PhysicalPlan successPlan : successorPlans) {
+                List<PhysicalOperator> leavesOfSuccessPlan = successPlan.getLeaves();
+                if (leavesOfSuccessPlan.size() != 1) {
+                    LOG.error("the size of leaves of SuccessPlan should be 1");
+                    break;
+                }
+                PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
+                physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan, operatorKeysOfAllPreds);
+            }
+        } else {
+            RDDConverter converter = convertMap.get(physicalOperator.getClass());
+            if (converter == null) {
+                throw new IllegalArgumentException(
+                        "Pig on Spark does not support Physical Operator: " + physicalOperator);
+            }
+
+            LOG.info("Converting operator "
+                    + physicalOperator.getClass().getSimpleName() + " "
+                    + physicalOperator);
+            List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds);
+            nextRDD = converter.convert(allPredRDDs, physicalOperator);
+
+            if (nextRDD == null) {
+                throw new IllegalArgumentException(
+                        "RDD should not be null after PhysicalOperator: "
+                                + physicalOperator);
+            }
+
+            physicalOpRdds.put(physicalOperator.getOperatorKey(), nextRDD);
+        }
+    }
+
+    //get all rdds of predecessors sorted by the OperatorKey
+    private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds)
{
+        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+        Collections.sort(operatorKeyOfAllPreds);
+        for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+            predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
+        }
+        return predecessorRDDs;
+    }
+
+    //deal special cases containing operators with multiple predecessors when multiquery
is enabled to get the predecessors of specified
+    // physicalOp in previous SparkOp(see PIG-4675)
+    private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, PhysicalOperator
physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors) {
+        // the relationship is stored in sparkOperator.getMultiQueryOptimizeConnectionItem()
+        List<OperatorKey> predOperatorKeys = sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
+        if (predOperatorKeys != null) {
+            for (OperatorKey predOperator : predOperatorKeys) {
+                LOG.debug(String.format("add predecessor(OperatorKey:%s) for OperatorKey:%s",
predOperator, physicalOperator.getOperatorKey()));
+                operatorKeysOfPredecessors.add(predOperator);
+            }
+        }
+    }
+
+    /**
+     * In Spark, currently only async actions return job id. There is no async
+     * equivalent of actions like saveAsNewAPIHadoopFile()
+     * <p/>
+     * The only other way to get a job id is to register a "job group ID" with
+     * the spark context and request all job ids corresponding to that job group
+     * via getJobIdsForGroup.
+     * <p/>
+     * However getJobIdsForGroup does not guarantee the order of the elements in
+     * it's result.
+     * <p/>
+     * This method simply returns the previously unseen job ids.
+     *
+     * @param seenJobIDs job ids in the job group that are already seen
+     * @return Spark job ids not seen before
+     */
+    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+        Set<Integer> groupjobIDs = new HashSet<Integer>(
+                Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+                        .getJobIdsForGroup(jobGroupID))));
+        groupjobIDs.removeAll(seenJobIDs);
+        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+        if (unseenJobIDs.size() == 0) {
+            throw new RuntimeException("Expected at least one unseen jobID "
+                    + " in this call to getJobIdsForGroup, but got "
+                    + unseenJobIDs.size());
+        }
+        seenJobIDs.addAll(unseenJobIDs);
+        return unseenJobIDs;
+    }
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Feb  3 12:50:40 2016
@@ -23,10 +23,8 @@ import java.io.PrintStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +32,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -47,7 +44,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -91,7 +87,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
@@ -107,21 +102,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
-import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
 
@@ -157,7 +148,7 @@ public class SparkLauncher extends Launc
             explain(sparkplan, System.out, "text", true);
         SparkPigStats sparkStats = (SparkPigStats) pigContext
                 .getExecutionEngine().instantiatePigStats();
-        sparkStats.initialize(pigContext, sparkplan);
+        sparkStats.initialize(pigContext, sparkplan, jobConf);
         PigStats.start(sparkStats);
 
         startSparkIfNeeded(pigContext);
@@ -214,13 +205,24 @@ public class SparkLauncher extends Launc
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
         convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
 
-        sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+        uploadUDFJars(sparkplan);
+        new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener,
jobGroupID).visit();
         cleanUpSparkJob();
         sparkStats.finish();
 
         return sparkStats;
     }
 
+    private void uploadUDFJars(SparkOperPlan sparkplan) throws IOException {
+        UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkplan, pigContext);
+        udfJarsFinder.visit();
+        Set<String> udfJars = udfJarsFinder.getUdfJars();
+        for (String udfJar : udfJars) {
+            File jarFile = new File(udfJar);
+            addJarToSparkJobWorkingDirectory(jarFile, jarFile.getName());
+        }
+    }
+
     private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
 
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
@@ -273,37 +275,6 @@ public class SparkLauncher extends Launc
         }
     }
 
-    /**
-     * In Spark, currently only async actions return job id. There is no async
-     * equivalent of actions like saveAsNewAPIHadoopFile()
-     * <p/>
-     * The only other way to get a job id is to register a "job group ID" with
-     * the spark context and request all job ids corresponding to that job group
-     * via getJobIdsForGroup.
-     * <p/>
-     * However getJobIdsForGroup does not guarantee the order of the elements in
-     * it's result.
-     * <p/>
-     * This method simply returns the previously unseen job ids.
-     *
-     * @param seenJobIDs job ids in the job group that are already seen
-     * @return Spark job ids not seen before
-     */
-    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
-        Set<Integer> groupjobIDs = new HashSet<Integer>(
-                Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
-                        .getJobIdsForGroup(jobGroupID))));
-        groupjobIDs.removeAll(seenJobIDs);
-        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
-        if (unseenJobIDs.size() == 0) {
-            throw new RuntimeException("Expected at least one unseen jobID "
-                    + " in this call to getJobIdsForGroup, but got "
-                    + unseenJobIDs.size());
-        }
-        seenJobIDs.addAll(unseenJobIDs);
-        return unseenJobIDs;
-    }
-
     private void cleanUpSparkJob() {
         LOG.info("clean up Spark Job");
         boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
@@ -442,6 +413,7 @@ public class SparkLauncher extends Launc
         SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
                 pigContext);
         sparkCompiler.compile();
+        sparkCompiler.connectSoftLink();
         SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
 
         // optimize key - value handling in package
@@ -539,201 +511,6 @@ public class SparkLauncher extends Launc
         }
     }
 
-    private void sparkPlanToRDD(SparkOperPlan sparkPlan,
-                                Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap,
-                                SparkPigStats sparkStats, JobConf jobConf)
-            throws IOException, InterruptedException {
-        Set<Integer> seenJobIDs = new HashSet<Integer>();
-        if (sparkPlan == null) {
-            throw new RuntimeException("SparkPlan is null.");
-        }
-
-        List<SparkOperator> leaves = sparkPlan.getLeaves();
-        Collections.sort(leaves);
-        Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Converting " + leaves.size() + " Spark Operators to RDDs");
-        }
-
-        for (SparkOperator leaf : leaves) {
-            new PhyPlanSetter(leaf.physicalPlan).visit();
-            Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
-            sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
-                    physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
-                    jobConf);
-        }
-    }
-
-    private void addUDFJarsToSparkJobWorkingDirectory(SparkOperator leaf) throws IOException
{
-
-        for (String udf : leaf.UDFs) {
-            Class clazz = pigContext.getClassForAlias(udf);
-            if (clazz != null) {
-                String jar = JarManager.findContainingJar(clazz);
-                if (jar != null) {
-                    File jarFile = new File(jar);
-                    addJarToSparkJobWorkingDirectory(jarFile, jarFile.getName());
-                }
-            }
-        }
-    }
-
-    private void sparkOperToRDD(SparkOperPlan sparkPlan,
-                                SparkOperator sparkOperator,
-                                Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
-                                Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
-                                Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap,
-                                Set<Integer> seenJobIDs, SparkPigStats sparkStats,
JobConf conf)
-            throws IOException, InterruptedException {
-        addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
-        List<SparkOperator> predecessors = sparkPlan
-                .getPredecessors(sparkOperator);
-        Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>();
-        if (predecessors != null) {
-            for (SparkOperator pred : predecessors) {
-                if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
-                    sparkOperToRDD(sparkPlan, pred, sparkOpRdds,
-                            physicalOpRdds, convertMap, seenJobIDs, sparkStats,
-                            conf);
-                }
-                predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
-            }
-        }
-
-        if (sparkOperator instanceof NativeSparkOperator) {
-            ((NativeSparkOperator) sparkOperator).runJob();
-            return;
-        }
-        List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
-        boolean isFail = false;
-        Exception exception = null;
-        //One SparkOperator may have multiple leaves(POStores) after multiquery feature is
enabled
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size()
+ " leaves");
-        }
-        for (PhysicalOperator leafPO : leafPOs) {
-            try {
-                physicalToRDD(sparkOperator, sparkOperator.physicalPlan, leafPO, physicalOpRdds,
-                        predecessorOfPreviousSparkOp, convertMap);
-                sparkOpRdds.put(sparkOperator.getOperatorKey(),
-                        physicalOpRdds.get(leafPO.getOperatorKey()));
-            } catch (Exception e) {
-                LOG.error("throw exception in sparkOperToRDD: ", e);
-                exception = e;
-                isFail = true;
-            }
-        }
-
-        List<POStore> poStores = PlanHelper.getPhysicalOperators(
-                sparkOperator.physicalPlan, POStore.class);
-        Collections.sort(poStores);
-        if (poStores.size() > 0) {
-            int i = 0;
-            if (!isFail) {
-                List<Integer> jobIDs = getJobIDs(seenJobIDs);
-                for (POStore poStore : poStores) {
-                    SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
-                            jobMetricsListener, sparkContext, sparkStats, conf);
-                }
-            } else {
-                for (POStore poStore : poStores) {
-                    String failJobID = sparkOperator.name().concat("_fail");
-                    SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats,
-                            conf, exception);
-                }
-            }
-        }
-    }
-
-    private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
-                               PhysicalOperator physicalOperator,
-                               Map<OperatorKey, RDD<Tuple>> rdds,
-                               Set<OperatorKey> predsFromPreviousSparkOper,
-                               Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap)
-            throws IOException {
-        RDD<Tuple> nextRDD = null;
-        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
-                .getPredecessors(physicalOperator);
-        if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size()
> 1) {
-            Collections.sort(predecessorsOfCurrentPhysicalOp);
-        }
-
-        Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
-        addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
-        if (predecessorsOfCurrentPhysicalOp != null) {
-            for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
-                physicalToRDD(sparkOperator, plan, predecessor, rdds, predsFromPreviousSparkOper,
-                        convertMap);
-                operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
-            }
-
-        } else {
-            if (predsFromPreviousSparkOper != null
-                    && predsFromPreviousSparkOper.size() > 0) {
-                for (OperatorKey predFromPreviousSparkOper : predsFromPreviousSparkOper)
{
-                    operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
-                }
-            }
-        }
-
-
-        if (physicalOperator instanceof POSplit) {
-            List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
-            for (PhysicalPlan successPlan : successorPlans) {
-                List<PhysicalOperator> leavesOfSuccessPlan = successPlan.getLeaves();
-                if (leavesOfSuccessPlan.size() != 1) {
-                    LOG.error("the size of leaves of SuccessPlan should be 1");
-                    break;
-                }
-                PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
-                physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan, rdds, operatorKeysOfAllPreds,
convertMap);
-            }
-        } else {
-            RDDConverter converter = convertMap.get(physicalOperator.getClass());
-            if (converter == null) {
-                throw new IllegalArgumentException(
-                        "Pig on Spark does not support Physical Operator: " + physicalOperator);
-            }
-
-            LOG.info("Converting operator "
-                    + physicalOperator.getClass().getSimpleName() + " "
-                    + physicalOperator);
-            List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds,
rdds);
-            nextRDD = converter.convert(allPredRDDs, physicalOperator);
-
-            if (nextRDD == null) {
-                throw new IllegalArgumentException(
-                        "RDD should not be null after PhysicalOperator: "
-                                + physicalOperator);
-            }
-
-            rdds.put(physicalOperator.getOperatorKey(), nextRDD);
-        }
-    }
-
-    //get all rdds of predecessors sorted by the OperatorKey
-    private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds,
Map<OperatorKey, RDD<Tuple>> rdds) {
-        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
-        Collections.sort(operatorKeyOfAllPreds);
-        for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
-            predecessorRDDs.add(rdds.get(operatorKeyOfAllPred));
-        }
-        return predecessorRDDs;
-    }
-
-    //deal special cases containing operators with multiple predecessors when multiquery
is enabled to get the predecessors of specified
-    // physicalOp in previous SparkOp(see PIG-4675)
-    private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, PhysicalOperator
physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors) {
-        // the relationship is stored in sparkOperator.getMultiQueryOptimizeConnectionItem()
-        List<OperatorKey> predOperatorKeys = sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
-        if (predOperatorKeys != null) {
-            for (OperatorKey predOperator : predOperatorKeys) {
-                LOG.debug(String.format("add predecessor(OperatorKey:%s) for OperatorKey:%s",
predOperator, physicalOperator.getOperatorKey()));
-                operatorKeysOfPredecessors.add(predOperator);
-            }
-        }
-    }
 
     @Override
     public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java?rev=1728303&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
Wed Feb  3 12:50:40 2016
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+//find udf jars which will be downloaded with spark job on every nodes
+public class UDFJarsFinder extends SparkOpPlanVisitor {
+    private PigContext pigContext = null;
+    private Set<String> udfJars = new HashSet();
+
+    public UDFJarsFinder(SparkOperPlan plan, PigContext pigContext) {
+        super(plan, new DependencyOrderWalker(plan));
+        this.pigContext = pigContext;
+    }
+
+    public void visitSparkOp(SparkOperator sparkOp)
+            throws VisitorException {
+        for (String udf : sparkOp.UDFs) {
+            try {
+                Class clazz = this.pigContext.getClassForAlias(udf);
+                if (clazz != null) {
+                    String jar = JarManager.findContainingJar(clazz);
+                    if (jar != null) {
+                        this.udfJars.add(jar);
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("pigContext.getClassForAlias(udf) fail, ", e);
+            }
+        }
+    }
+
+    public Set<String> getUdfJars() {
+        return this.udfJars;
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
Wed Feb  3 12:50:40 2016
@@ -59,7 +59,8 @@ public class NativeSparkOperator extends
         } catch (SecurityException se) {   //java.lang.reflect.InvocationTargetException
             if (secMan.getExitInvoked()) {
                 if (secMan.getExitCode() != 0) {
-                    throw new JobCreationException("Native job returned with non-zero return
code");
+                    JobCreationException e = new JobCreationException("Native job returned
with non-zero return code");
+                    SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e);
                 } else {
                     SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Feb  3 12:50:40 2016
@@ -476,7 +476,8 @@ public class SparkCompiler extends PhyPl
 		try {
 			addToPlan(op);
             curSparkOp.markLimit();
-		} catch (Exception e) {
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
 			int errCode = 2034;
 			String msg = "Error compiling operator "
 					+ op.getClass().getSimpleName();

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Feb
 3 12:50:40 2016
@@ -47,18 +47,18 @@ public class SparkJobStats extends JobSt
     private int jobId;
     private Map<String, Long> stats = Maps.newLinkedHashMap();
 
-    protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
-        this(String.valueOf(jobId), plan);
+    protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
+        this(String.valueOf(jobId), plan, conf);
         this.jobId = jobId;
     }
 
-    protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+    protected SparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
         super(jobId, plan);
+        setConf(conf);
     }
 
     public void addOutputInfo(POStore poStore, boolean success,
-                              JobMetricsListener jobMetricsListener,
-                              Configuration conf) {
+                              JobMetricsListener jobMetricsListener) {
         if (!poStore.isTmpStore()) {
             long bytes = getOutputSize(poStore, conf);
             long recordsCount = SparkStatsUtil.getStoreSparkCounterValue(poStore);
@@ -72,8 +72,7 @@ public class SparkJobStats extends JobSt
     }
 
     public void addInputStats(POLoad po, boolean success,
-                              boolean singleInput,
-                              Configuration conf){
+                              boolean singleInput) {
 
         long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
         long bytesRead = -1;

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Feb
 3 12:50:40 2016
@@ -53,26 +53,28 @@ public class SparkPigStats extends PigSt
 
     private SparkScriptState sparkScriptState;
 
+    private Configuration conf;
+
     public SparkPigStats() {
         jobPlan = new JobGraph();
         this.sparkScriptState = (SparkScriptState) ScriptState.get();
     }
 
-    public void initialize(PigContext pigContext, SparkOperPlan sparkPlan){
+    public void initialize(PigContext pigContext, SparkOperPlan sparkPlan, Configuration
conf) {
         super.start();
         this.pigContext = pigContext;
+        this.conf = conf;
         sparkScriptState.setScriptInfo(sparkPlan);
     }
 
     public void addJobStats(POStore poStore, SparkOperator sparkOperator, int jobId,
                             JobMetricsListener jobMetricsListener,
-                            JavaSparkContext sparkContext,
-                            Configuration conf) {
+                            JavaSparkContext sparkContext) {
         boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobStats.collectStats(jobMetricsListener);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
         addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener,
conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
@@ -82,13 +84,12 @@ public class SparkPigStats extends PigSt
     public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId,
                                 JobMetricsListener jobMetricsListener,
                                 JavaSparkContext sparkContext,
-                                Configuration conf,
                                 Exception e) {
         boolean isSuccess = false;
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobStats.collectStats(jobMetricsListener);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
         addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener,
conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
@@ -97,12 +98,12 @@ public class SparkPigStats extends PigSt
         }
     }
 
-    public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean
isSuccess, Exception e){
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+    public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean
isSuccess, Exception e) {
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
-        jobSparkOperatorMap.put(jobStats,sparkOperator);
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
-        if( e != null ){
+        if (e != null) {
             jobStats.setBackendException(e);
         }
     }
@@ -202,7 +203,7 @@ public class SparkPigStats extends PigSt
             List<POLoad> poLoads = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan,
POLoad.class);
             for (POLoad load : poLoads) {
                 if (!load.isTmpLoad()) {
-                    jobStats.addInputStats(load, isSuccess, (poLoads.size() == 1), conf);
+                    jobStats.addInputStats(load, isSuccess, (poLoads.size() == 1));
                 }
             }
         } catch (VisitorException ve) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Feb
 3 12:50:40 2016
@@ -41,8 +41,7 @@ public class SparkStatsUtil {
                                           POStore poStore, SparkOperator sparkOperator,
                                           JobMetricsListener jobMetricsListener,
                                           JavaSparkContext sparkContext,
-                                          SparkPigStats sparkPigStats,
-                                          JobConf jobConf)
+                                          SparkPigStats sparkPigStats)
             throws InterruptedException {
         // Even though we are not making any async calls to spark,
         // the SparkStatusTracker can still return RUNNING status
@@ -53,18 +52,18 @@ public class SparkStatsUtil {
         // To workaround this, we will wait for this job to "finish".
         jobMetricsListener.waitForJobToEnd(jobID);
         sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
-                sparkContext, jobConf);
+                sparkContext);
         jobMetricsListener.cleanup(jobID);
     }
 
     public static void addFailJobStats(String jobID,
                                        POStore poStore, SparkOperator sparkOperator,
                                        SparkPigStats sparkPigStats,
-                                       JobConf jobConf, Exception e) {
+                                       Exception e) {
         JobMetricsListener jobMetricsListener = null;
         JavaSparkContext sparkContext = null;
         sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
-                sparkContext, jobConf, e);
+                sparkContext, e);
     }
 
     public static String getStoreSparkCounterName(POStore store) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java Wed Feb  3 12:50:40
2016
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -210,6 +211,13 @@ public class TestNativeMapReduce  {
         } catch (JobCreationException e) {
             // Running in Tez mode throw exception
             assertTrue(e.getCause() instanceof FileAlreadyExistsException);
+        } catch (ExecException e) {
+            // Running in spark mode throw exception
+            if (e.getCause() instanceof RuntimeException) {
+                RuntimeException re = (RuntimeException) e.getCause();
+                JobCreationException jce = (JobCreationException) re.getCause();
+                assertTrue(jce.getCause() instanceof FileAlreadyExistsException);
+            }
         }
         finally{
             // We have to manually delete intermediate mapreduce files




Mime
View raw message