pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1731248 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java test/org/apache/pig/test/TestFinish.java
Date Fri, 19 Feb 2016 14:28:11 GMT
Author: xuefu
Date: Fri Feb 19 14:28:11 2016
New Revision: 1731248

URL: http://svn.apache.org/viewvc?rev=1731248&view=rev
Log:
PIG-4281: Fix TestFinish for Spark engine (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/test/org/apache/pig/test/TestFinish.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1731248&r1=1731247&r2=1731248&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Fri Feb 19 14:28:11 2016
@@ -30,8 +30,10 @@ import java.util.Set;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -82,6 +84,7 @@ public class JobGraphBuilder extends Spa
         new PhyPlanSetter(sparkOp.physicalPlan).visit();
         try {
             sparkOperToRDD(sparkOp);
+            finishUDFs(sparkOp.physicalPlan);
         } catch (InterruptedException e) {
             throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
         } catch (JobCreationException e){
@@ -89,6 +92,20 @@ public class JobGraphBuilder extends Spa
         }
     }
 
+    // Calling EvalFunc.finish()
+    private void finishUDFs(PhysicalPlan physicalPlan) throws VisitorException {
+        UDFFinishVisitor finisher = new UDFFinishVisitor(physicalPlan,
+                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
+                        physicalPlan));
+        try {
+            finisher.visit();
+        } catch (VisitorException e) {
+            int errCode = 2121;
+            String msg = "Error while calling finish method on UDFs.";
+            throw new VisitorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
     private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException,
VisitorException, JobCreationException {
         List<SparkOperator> predecessors = sparkPlan
                 .getPredecessors(sparkOperator);

Modified: pig/branches/spark/test/org/apache/pig/test/TestFinish.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFinish.java?rev=1731248&r1=1731247&r2=1731248&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFinish.java Fri Feb 19 14:28:11 2016
@@ -67,7 +67,12 @@ public class TestFinish {
         @Override
         public void finish() {
             try {
-                FileSystem fs = FileSystem.get(PigMapReduce.sJobConfInternal.get());
+                FileSystem fs = null;
+                if (execType.equalsIgnoreCase("SPARK")) {
+                    fs = FileSystem.get(cluster.getConfiguration());
+                } else {
+                    fs = FileSystem.get(PigMapReduce.sJobConfInternal.get());
+                }
                 fs.create(new Path(expectedFileName));
             } catch (IOException e) {
                 throw new RuntimeException("Unable to create file:" + expectedFileName);
@@ -136,7 +141,7 @@ public class TestFinish {
         String inputFileName = setUp(cluster.getExecType());
         // this file will be created on the cluster if finish() is called
         String expectedFileName = "testFinishInMapMR-finish.txt";
-        pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
+        pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('"+cluster.getExecType()+"','"
                 + expectedFileName + "');");
         pigServer.registerQuery("a = load '" + Util.encodeEscape(inputFileName) + "' using
"
                 + PigStorage.class.getName() + "(':');");
@@ -155,7 +160,7 @@ public class TestFinish {
         String inputFileName = setUp(cluster.getExecType());
         // this file will be created on the cluster if finish() is called
         String expectedFileName = "testFinishInReduceMR-finish.txt";
-        pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
+        pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('"+cluster.getExecType()+"','"
                 + expectedFileName + "');");
         pigServer.registerQuery("a = load '" + Util.encodeEscape(inputFileName) + "' using
"
                 + PigStorage.class.getName() + "(':');");



Mime
View raw message