pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1732161 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/convert...
Date Wed, 24 Feb 2016 14:19:18 GMT
Author: xuefu
Date: Wed Feb 24 14:19:17 2016
New Revision: 1732161

URL: http://svn.apache.org/viewvc?rev=1732161&view=rev
Log:
PIG-4807: Fix test cases of TestEvalPipelineLocal test suite (Prateek via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1732161&r1=1732160&r2=1732161&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
Wed Feb 24 14:19:17 2016
@@ -342,6 +342,10 @@ public class POSort extends PhysicalOper
         mSortFunc = sortFunc;
     }
 
+    public Comparator<Tuple> getMComparator() {
+        return mComparator;
+    }
+
     public List<Boolean> getMAscCols() {
         return mAscCols;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1732161&r1=1732160&r2=1732161&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Feb 24 14:19:17 2016
@@ -102,6 +102,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
@@ -545,9 +546,18 @@ public class SparkLauncher extends Launc
             SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
             printer.setVerbose(verbose);
             printer.visit();
+        } else if (format.equals("dot")) {
+            ps.println("#--------------------------------------------------");
+            ps.println("# Spark Plan                                  ");
+            ps.println("#--------------------------------------------------");
+
+            DotSparkPrinter printer = new DotSparkPrinter(sparkPlan, ps);
+            printer.setVerbose(verbose);
+            printer.dump();
+            ps.println("");
         } else { // TODO: add support for other file format
             throw new IOException(
-                    "Non-text output of explain is not supported.");
+                    "Non-text and non-dot output of explain is not supported.");
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1732161&r1=1732160&r2=1732161&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Wed Feb 24 14:19:17 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -82,6 +83,9 @@ public class LoadConverter implements RD
         // to create a new conf for a new RDD.
         JobConf jobConf = SparkUtil.newJobConf(pigContext);
         configureLoader(physicalPlan, op, jobConf);
+        // need to serialize the configuration loaded in jobConf
+        // to make sure we can access the right config later
+        UDFContext.getUDFContext().serialize(jobConf);
 
         // Set the input directory for input formats that are backed by a
         // filesystem. (Does not apply to HBase, for example).

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1732161&r1=1732160&r2=1732161&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
Wed Feb 24 14:19:17 2016
@@ -54,7 +54,7 @@ public class SortConverter implements RD
                 SparkUtil.getManifest(Object.class));
 
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
-                sortOperator.new SortComparator(), true);
+                sortOperator.getMComparator(), true);
         JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
 
         return mapped.rdd();

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1732161&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
Wed Feb 24 14:19:17 2016
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+/**
+ * This class can print Spark plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting in the associated physical plans.
+ */
+public class DotSparkPrinter extends DotPlanDumper<SparkOperator, SparkOperPlan,
+        DotSparkPrinter.InnerOperator,
+        DotSparkPrinter.InnerPlan> {
+
+    static int counter = 0;
+    boolean isVerboseNesting = true;
+
+    public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),
+                new HashSet<Operator>());
+    }
+
+    private DotSparkPrinter(SparkOperPlan plan, PrintStream ps, boolean isSubGraph,
+                         Set<Operator> subgraphs,
+                         Set<Operator> multiInputSubgraphs,
+                         Set<Operator> multiOutputSubgraphs) {
+        super(plan, ps, isSubGraph, subgraphs,
+                multiInputSubgraphs, multiOutputSubgraphs);
+    }
+
+    @Override
+    public void setVerbose(boolean verbose) {
+        // leave the parents verbose set to true
+        isVerboseNesting = verbose;
+    }
+
+    @Override
+    protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) {
+        return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs,
+                mMultiOutputSubgraphs);
+    }
+
+    @Override
+    protected String getName(SparkOperator op) {
+        String name = op.name();
+        // Cut of the part of the name specifying scope.
+        String delimiter = " - ";
+        String[] temp;
+        temp = name.split(delimiter);
+        return temp[0];
+    }
+
+    @Override
+    protected Collection<InnerPlan> getNestedPlans(SparkOperator op) {
+        Collection<InnerPlan> plans = new LinkedList<InnerPlan>();
+        plans.add(new InnerPlan(op.physicalPlan));
+        return plans;
+    }
+
+    @Override
+    protected String[] getAttributes(SparkOperator op) {
+        String[] attributes = new String[3];
+        attributes[0] = "label=\""+getName(op)+"\"";
+        attributes[1] = "style=\"filled\"";
+        attributes[2] = "fillcolor=\"#EEEEEE\"";
+        return attributes;
+    }
+
+
+    /**
+     * Helper class to represent the relationship of inner operators
+     */
+    public static class InnerOperator extends Operator<PlanVisitor> {
+
+        private static final long serialVersionUID = 1L;
+        String name;
+        PhysicalPlan plan;
+        int code;
+
+        public InnerOperator(PhysicalPlan plan, String name) {
+            super(new OperatorKey());
+            this.name = name;
+            this.plan = plan;
+            this.code = counter++;
+        }
+
+        @Override public void visit(PlanVisitor v) {}
+        @Override public boolean supportsMultipleInputs() {return false;}
+        @Override public boolean supportsMultipleOutputs() {return false;}
+        @Override public String name() {return name;}
+        public PhysicalPlan getPlan() {return plan;}
+        @Override public int hashCode() {return code;}
+    }
+
+    /**
+     * Each spark operator will have and an inner plan of inner
+     * operators. The inner operators contain the physical plan of the
+     * execution phase.
+     */
+    public static class InnerPlan extends OperatorPlan<InnerOperator> {
+
+        private static final long serialVersionUID = 1L;
+
+        public InnerPlan(PhysicalPlan plan) {
+            InnerOperator sparkInnerOp = new InnerOperator(plan, "spark");
+            this.add(sparkInnerOp);
+        }
+    }
+
+    private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan,
+            PhysicalOperator, PhysicalPlan> {
+
+        public InnerPrinter(InnerPlan plan, PrintStream ps,
+                            Set<Operator> subgraphs,
+                            Set<Operator> multiInputSubgraphs,
+                            Set<Operator> multiOutputSubgraphs) {
+            super(plan, ps, true, subgraphs, multiInputSubgraphs,
+                    multiOutputSubgraphs);
+        }
+
+        @Override
+        protected String[] getAttributes(InnerOperator op) {
+            String[] attributes = new String[3];
+            attributes[0] = "label=\""+super.getName(op)+"\"";
+            attributes[1] = "style=\"filled\"";
+            attributes[2] = "fillcolor=\"white\"";
+            return attributes;
+        }
+
+        @Override
+        protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) {
+            Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>();
+            l.add(op.getPlan());
+            return l;
+        }
+
+        @Override
+        protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) {
+            DotPOPrinter printer = new DotPOPrinter(plan, ps, true,
+                    mSubgraphs,
+                    mMultiInputSubgraphs,
+                    mMultiOutputSubgraphs);
+            printer.setVerbose(isVerboseNesting);
+            return printer;
+        }
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1732161&r1=1732160&r2=1732161&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Feb 24 14:19:17
2016
@@ -1114,6 +1114,8 @@ public class TestEvalPipelineLocal {
     
     @Test
     public void testSetLocationCalledInFE() throws Exception {
+        // Need to reset it when running multiple testcases
+        UDFContext.getUDFContext().addJobConf(null);
         File f1 = createFile(new String[]{"a","b"});
         pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
                 + "' using " + SetLocationTestLoadFunc.class.getName()



Mime
View raw message