pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1796232 - in /pig/branches/spark: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/backend/hadoop/executionengine/spark/pl...
Date Fri, 26 May 2017 02:01:14 GMT
Author: zly
Date: Fri May 26 02:01:13 2017
New Revision: 1796232

URL: http://svn.apache.org/viewvc?rev=1796232&view=rev
Log:
PIG-5215:Merge changes from review board to spark branch(Liyun)

Modified:
    pig/branches/spark/build.xml
    pig/branches/spark/src/org/apache/pig/PigConfiguration.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
    pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
    pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
    pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java
    pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
    pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
    pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
    pig/branches/spark/test/org/apache/pig/test/TestPigServer.java
    pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java
    pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
    pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld

Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Fri May 26 02:01:13 2017
@@ -908,9 +908,6 @@
             <sysproperty key="test.exec.type" value="${test.exec.type}" />
             <sysproperty key="ssh.gateway" value="${ssh.gateway}" />
             <sysproperty key="hod.server" value="${hod.server}" />
-            <sysproperty key="build.classes" value="${build.classes}" />
-            <sysproperty key="test.build.classes" value="${test.build.classes}" />
-            <sysproperty key="ivy.lib.dir" value="${ivy.lib.dir}" />
             <sysproperty key="java.io.tmpdir" value="${junit.tmp.dir}" />
             <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
             <jvmarg line="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128M ${debugArgs}
-Djava.library.path=${hadoop.root}\bin"/>

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri May 26 02:01:13 2017
@@ -513,6 +513,8 @@ public class PigConfiguration {
 
     public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
 
+    public static final String PIG_PRINT_EXEC_PLAN = "pig.print.exec.plan";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Fri May 26 02:01:13 2017
@@ -241,7 +241,7 @@ public class JobGraphBuilder extends Spa
             throws IOException {
         RDD<Tuple> nextRDD = null;
         List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan,
physicalOperator);
-        Set<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
+        LinkedHashSet<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
         addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
         if (predecessorsOfCurrentPhysicalOp != null) {
             for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
@@ -260,14 +260,13 @@ public class JobGraphBuilder extends Spa
 
         if (physicalOperator instanceof POSplit) {
             List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
-            for (PhysicalPlan succcessorPlan : successorPlans) {
-                List<PhysicalOperator> leavesOfSuccessPlan = succcessorPlan.getLeaves();
+            for (PhysicalPlan successorPlan : successorPlans) {
+                List<PhysicalOperator> leavesOfSuccessPlan = successorPlan.getLeaves();
                 if (leavesOfSuccessPlan.size() != 1) {
-                    LOG.error("the size of leaves of successorPlan should be 1");
                     throw new RuntimeException("the size of the leaves of successorPlan should
be 1");
                 }
                 PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
-                physicalToRDD(sparkOperator, succcessorPlan, leafOfSuccessPlan, operatorKeysOfAllPreds);
+                physicalToRDD(sparkOperator, successorPlan, leafOfSuccessPlan, operatorKeysOfAllPreds);
             }
         } else {
             RDDConverter converter = convertMap.get(physicalOperator.getClass());
@@ -328,12 +327,8 @@ public class JobGraphBuilder extends Spa
     }
 
     //get all rdds of predecessors sorted by the OperatorKey
-    private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds)
{
+    private List<RDD<Tuple>> sortPredecessorRDDs(LinkedHashSet <OperatorKey>
operatorKeysOfAllPreds) {
         List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-//        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
-//        Collections.sort(operatorKeyOfAllPreds);
-        //We need not sort operatorKeyOfAllPreds any more because operatorKeyOfAllPreds is
LinkedHashSet
-        //which provides the order of insertion, before we insert element which is sorted
by OperatorKey
         for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
             predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri May 26 02:01:13 2017
@@ -167,7 +167,9 @@ public class SparkLauncher extends Launc
         this.pigContext = pigContext;
         initialize(physicalPlan);
         SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-        LOG.info(sparkplan);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(sparkplan);
+        }
         SparkPigStats sparkStats = (SparkPigStats) pigContext
                 .getExecutionEngine().instantiatePigStats();
         sparkStats.initialize(pigContext, sparkplan, jobConf);
@@ -221,8 +223,13 @@ public class SparkLauncher extends Launc
         convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext));
         convertMap.put(POSampleSortSpark.class, new SparkSampleSortConverter());
         convertMap.put(POPoissonSampleSpark.class, new PoissonSampleConverter());
-
+        //Print SPARK plan before launching if needed
+        Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
+        if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) {
+            LOG.info(sparkplan);
+        }
         uploadResources(sparkplan);
+
         new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener,
jobGroupID, jobConf, pigContext).visit();
         cleanUpSparkJob(sparkStats);
         sparkStats.finish();
@@ -239,13 +246,13 @@ public class SparkLauncher extends Launc
         addJarsToSparkJob(sparkPlan);
     }
 
-    private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
+    private void optimize(SparkOperPlan plan, PigContext pigContext) throws IOException {
 
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+        Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
 
         // Should be the first optimizer as it introduces new operators to the plan.
         boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
-        if (!pc.inIllustrator && !noCombiner)  {
+        if (!pigContext.inIllustrator && !noCombiner)  {
             CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan);
             combinerOptimizer.visit();
             if (LOG.isDebugEnabled()) {
@@ -255,7 +262,7 @@ public class SparkLauncher extends Launc
         }
 
         boolean noSecondaryKey = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY,
false);
-        if (!pc.inIllustrator && !noSecondaryKey) {
+        if (!pigContext.inIllustrator && !noSecondaryKey) {
             SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
             skOptimizer.visit();
         }
@@ -400,6 +407,7 @@ public class SparkLauncher extends Launc
                         File tmpFile = new File(tmpFolder, fileName);
                         Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
                         FileSystem fs = tmpFilePath.getFileSystem(jobConf);
+                        //TODO:PIG-5241 Specify the hdfs path directly to spark and avoid
the unnecessary download and upload in SparkLauncher.java
                         fs.copyToLocalFile(src, tmpFilePath);
                         tmpFile.deleteOnExit();
                         LOG.info(String.format("CacheFile:%s", fileName));
@@ -476,7 +484,7 @@ public class SparkLauncher extends Launc
                             localFile.getAbsolutePath()));
                 }
                 Files.copy(Paths.get(new Path(resourcePath.getAbsolutePath()).toString()),
-                        Paths.get(localFile.getAbsolutePath()));
+                    Paths.get(localFile.getAbsolutePath()));
             }
         } else {
             if(resourceType == ResourceType.JAR){
@@ -517,7 +525,7 @@ public class SparkLauncher extends Launc
                 sparkPlan);
         pkgAnnotator.visit();
 
-        optimize(pigContext, sparkPlan);
+        optimize(sparkPlan, pigContext);
         return sparkPlan;
     }
 
@@ -641,7 +649,7 @@ public class SparkLauncher extends Launc
             printer.visit();
         } else if (format.equals("dot")) {
             ps.println("#--------------------------------------------------");
-            ps.println("# Spark Plan                                  ");
+            ps.println("# Spark Plan");
             ps.println("#--------------------------------------------------");
 
             DotSparkPrinter printer = new DotSparkPrinter(sparkPlan, ps);
@@ -700,7 +708,7 @@ public class SparkLauncher extends Launc
         PigMapReduce.sJobConfInternal.set(jobConf);
         String parallelism = pigContext.getProperties().getProperty("spark.default.parallelism");
         if (parallelism != null) {
-            SparkPigContext.get().setPigDefaultParallelism(Integer.parseInt(parallelism));
+            SparkPigContext.get().setDefaultParallelism(Integer.parseInt(parallelism));
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
Fri May 26 02:01:13 2017
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHa
 public class SparkPigContext {
 
     private static SparkPigContext context =  null;
-    private static ThreadLocal<Integer> PIG_DEFAULT_PARALLELISM = null;
+    private static ThreadLocal<Integer> defaultParallelism = null;
     private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> broadcastedVars
= new ConcurrentHashMap() ;
 
     public static SparkPigContext get(){
@@ -40,15 +40,15 @@ public class SparkPigContext {
         }
         return context;
     }
-    public static int getPigDefaultParallelism() {
-        return PIG_DEFAULT_PARALLELISM.get();
+    public static int getDefaultParallelism() {
+        return defaultParallelism.get();
     }
 
 
     public static int getParallelism(List<RDD<Tuple>> predecessors,
                                      PhysicalOperator physicalOperator) {
-        if (PIG_DEFAULT_PARALLELISM != null) {
-           return getPigDefaultParallelism();
+        if (defaultParallelism != null) {
+           return getDefaultParallelism();
         }
 
         int parallelism = physicalOperator.getRequestedParallelism();
@@ -69,8 +69,8 @@ public class SparkPigContext {
         return parallelism;
     }
 
-    public static void setPigDefaultParallelism(int pigDefaultParallelism) {
-        PIG_DEFAULT_PARALLELISM.set(pigDefaultParallelism);
+    public static void setDefaultParallelism(int defaultParallelism) {
+        SparkPigContext.defaultParallelism.set(defaultParallelism);
     }
 
      public static ConcurrentHashMap<String, Broadcast<List<Tuple>>> getBroadcastedVars()
{

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
Fri May 26 02:01:13 2017
@@ -323,7 +323,7 @@ public class GlobalRearrangeConverter im
                             try {
                                 Tuple tuple = tf.newTuple(3);
                                 tuple.set(0, index);
-                                tuple.set(2, next);
+                                tuple.set(1, next);
                                 return tuple;
                             } catch (ExecException e) {
                                 throw new RuntimeException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
Fri May 26 02:01:13 2017
@@ -89,7 +89,7 @@ public class PackageConverter implements
                             // we want the value and index only
                             Tuple next = bagIterator.next();
                             NullableTuple nullableTuple = new NullableTuple(
-                                    (Tuple) next.get(2));
+                                    (Tuple) next.get(1));
                             nullableTuple.setIndex(((Number) next.get(0))
                                     .byteValue());
                             if (LOG.isDebugEnabled())

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Fri May 26 02:01:13 2017
@@ -269,9 +269,9 @@ public class SparkOperator extends Opera
         }
     }
 
-	public boolean isSkewedJoin() {
-		return (skewedJoinPartitionFile != null);
-	}
+    public boolean isSkewedJoin() {
+        return (skewedJoinPartitionFile != null);
+    }
 
     public void setRequestedParallelism(int requestedParallelism) {
         this.requestedParallelism = requestedParallelism;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
Fri May 26 02:01:13 2017
@@ -41,7 +41,7 @@ public class SparkPrinter extends SparkO
         super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
         mStream = ps;
         mStream.println("#--------------------------------------------------");
-        mStream.println("# Spark Plan                                  ");
+        mStream.println("# Spark Plan");
         mStream.println("#--------------------------------------------------");
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
Fri May 26 02:01:13 2017
@@ -53,10 +53,9 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 public class SecondaryKeyOptimizerUtil {
     private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
-    private static boolean isSparkMode;
 
     public SecondaryKeyOptimizerUtil() {
 
@@ -679,8 +678,4 @@ public class SecondaryKeyOptimizerUtil {
         }
         return false;
     }
-
-    public static void setIsSparkMode(boolean isSparkMode) {
-        SecondaryKeyOptimizerUtil.isSparkMode = isSparkMode;
-    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Fri May 26 02:01:13 2017
@@ -208,7 +208,7 @@ public class UDFContext {
     }
 
     /*
-     called by SparkEngineConf#writeObject
+     * Internal pig use
      */
     public String serialize() {
         try {

Modified: pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
(original)
+++ pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
Fri May 26 02:01:13 2017
@@ -63,6 +63,7 @@ public class TestLocationInPhysicalPlan
         if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
             Assert.assertEquals("A[1,4],A[3,4],B[2,4]", jStats.getAliasLocation());
         } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
+            //TODO PIG-5239:Investigate why there are duplicated A[3,4]
             Assert.assertEquals("A[1,4],A[3,4],B[2,4],A[3,4]", jStats.getAliasLocation());
         } else {
             Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: A[3,4]", jStats.getAliasLocation());

Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri May 26 02:01:13 2017
@@ -116,17 +116,17 @@ public class TestCombiner {
         inputLines.add("a,c,1");
         String inputFileName = loadWithTestLoadFunc("A", pig, inputLines);
 
-        pig.registerQuery("B = group A by ($0, $1);");
-        pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);");
+        pig.registerQuery("B = foreach A generate $0 as (c0:chararray), $1 as (c1:chararray),
$2 as (c2:int);");
+        pig.registerQuery("C = group B by ($0, $1);");
+        pig.registerQuery("D = foreach C generate flatten(group), COUNT($1) as int;");
         // Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails
assert.
-        List<String> resultTuples = new ArrayList<>();
-        resultTuples.add("(a,b,2)");
-        resultTuples.add("(a,c,1)");
-        Iterator<Tuple> resultIterator = pig.openIterator("C");
-        Tuple tuple = resultIterator.next();
-        assertTrue(resultTuples.contains(tuple.toString()));
-        tuple = resultIterator.next();
-        assertTrue(resultTuples.contains(tuple.toString()));
+        List<Tuple> resultTuples = Util.getTuplesFromConstantTupleStrings(
+            new String[]{
+                "('a','b',2)",
+                "('a','c',1)",
+            });
+        Iterator<Tuple> resultIterator = pig.openIterator("D");
+        Util.checkQueryOutputsAfterSort(resultIterator, resultTuples);
 
         return inputFileName;
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java Fri May 26 02:01:13
2017
@@ -566,9 +566,9 @@ public class TestCubeOperator {
     public void testIllustrate() throws Exception {
 	// test for illustrate
         Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ"));
-        Assume.assumeTrue("illustrate does not work in tez (PIG-4621)", !Util.getLocalTestMode().toString().startsWith("SPARK"));
-	String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long);
"
-	        + "b = cube a by cube(a1,b1);";
+        Assume.assumeTrue("illustrate does not work in spark (PIG-4621)", !Util.getLocalTestMode().toString().startsWith("SPARK"));
+        String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long);
"
+            + "b = cube a by cube(a1,b1);";
 
         Util.registerMultiLineQuery(pigServer, query);
         Map<Operator, DataBag> examples = pigServer.getExamples("b");

Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Fri May 26 02:01:13
2017
@@ -88,6 +88,7 @@ public class TestEmptyInputDir {
 
             //Spark doesn't create an empty result file part-*, only a _SUCCESS file if input
dir was empty
             Assume.assumeTrue("Skip this test for Spark. See PIG-5140", !Util.isSparkExecType(cluster.getExecType()));
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri May 26 02:01:13 2017
@@ -906,6 +906,15 @@ public class TestGrunt {
 
     @Test
     public void testKeepGoigFailed() throws Throwable {
+        // in mr mode, the output file 'baz' will be automatically deleted if the mr job
fails
+        // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz
does not exist"
+        // in GruntParser#processCat() and variable "caught" is true
+        // in spark mode, the output file 'baz' will not be automatically deleted even the
job fails(see SPARK-7953)
+        // when "cat baz;" is executed, it does not throw exception and the variable "caught"
is false
+        // TODO: Enable this for Spark when SPARK-7953 is resolved
+        Assume.assumeTrue(
+            "Skip this test for Spark until SPARK-7953 is resolved!",
+            !Util.isSparkExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext context = server.getPigContext();
         Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd");
@@ -927,23 +936,14 @@ public class TestGrunt {
         InputStreamReader reader = new InputStreamReader(cmd);
 
         Grunt grunt = new Grunt(new BufferedReader(reader), context);
-
         boolean caught = false;
-        // in mr mode, the output file 'baz' will be automatically deleted if the mr job
fails
-        // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz
does not exist"
-        // in GruntParser#processCat() and variable "caught" is true
-        // in spark mode, the output file 'baz' will not be automatically deleted even the
job fails(see SPARK-7953)
-        // when "cat baz;" is executed, it does not throw exception and the variable "caught"
is false
-        // TODO: Enable this for Spark when SPARK-7953 is resolved
-        Assume.assumeTrue(
-                "Skip this test for Spark until SPARK-7953 is resolved!",
-                !Util.isSparkExecType(cluster.getExecType()));
         try {
             grunt.exec();
         } catch (Exception e) {
             caught = true;
             assertTrue(e.getMessage().contains("baz does not exist"));
         }
+        assertTrue(caught);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Fri May 26 02:01:13 2017
@@ -210,17 +210,13 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-            if (execType.toString().startsWith("tez")) {
-                assertEquals(1, stats.getNumberJobs());
-                assertEquals(stats.getJobGraph().size(), 1);
-            } else if (execType.toString().startsWith("spark")) {
-                // In spark mode,the number of spark job is calculated by the number of POStore.
-                // 1 POStore generates 1 spark job.
-                assertEquals(1, stats.getNumberJobs());
-                assertEquals(stats.getJobGraph().size(), 1);
-            } else {
+            if (execType.equals("mapreduce")) {
                 assertEquals(2, stats.getNumberJobs());
                 assertEquals(stats.getJobGraph().size(), 2);
+            } else {
+                // Tez and Spark
+                assertEquals(1, stats.getNumberJobs());
+                assertEquals(stats.getJobGraph().size(), 1);
             }
 
             Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
@@ -460,18 +456,13 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = null;
-            if (execType.toUpperCase().equals((new SparkExecType()).name())) {
-                args = new String[]{"-no_multiquery", "-x", execType, PIG_FILE};
-
-            } else {
-                args = new String[]{"-x", execType, PIG_FILE};
-            }
-            PigStats stats =  PigRunner.run(args, new TestNotificationListener(execType));
+          String[] args = null;
+          args = new String[]{"-x", execType, PIG_FILE};
+          PigStats stats =  PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
             if (Util.isMapredExecType(cluster.getExecType())) {
                 assertEquals(3, stats.getJobGraph().size());
-            } if (Util.isSparkExecType(cluster.getExecType())) {
+            } else if (Util.isSparkExecType(cluster.getExecType())) {
                 // One for each store and 3 for join.
                 assertEquals(4, stats.getJobGraph().size());
             } else {
@@ -514,11 +505,14 @@ public class TestPigRunner {
             });
             assertEquals(5, inputStats.get(0).getNumberRecords());
             assertEquals(3, inputStats.get(1).getNumberRecords());
-            // For mapreduce, since hdfs bytes read includes replicated tables bytes read
is wrong
             // Since Tez does has only one load per job its values are correct
-            // By pass the check for spark due to PIG-4788
-            if (!Util.isMapredExecType(cluster.getExecType()) && !Util.isSparkExecType(cluster.getExecType()))
{
+            // the result of inputStats in spark mode is also correct
+            if (!Util.isMapredExecType(cluster.getExecType())) {
                 assertEquals(30, inputStats.get(0).getBytes());
+            }
+
+            //TODO PIG-5240:Fix TestPigRunner#simpleMultiQueryTest3 in spark mode for wrong
inputStats
+            if (!Util.isMapredExecType(cluster.getExecType()) && !Util.isSparkExecType(cluster.getExecType()))
{
                 assertEquals(18, inputStats.get(1).getBytes());
             }
         } finally {
@@ -545,17 +539,15 @@ public class TestPigRunner {
             Iterator<JobStats> iter = stats.getJobGraph().iterator();
             while (iter.hasNext()) {
                 JobStats js=iter.next();
-                if (execType.equals("tez")) {
-                    assertEquals(js.getState().name(), "FAILED");
-                } else if (execType.equals("spark")) {
-                    assertEquals(js.getState().name(), "FAILED");
-                } else {
-                    if(js.getState().name().equals("FAILED")) {
-                        List<Operator> ops=stats.getJobGraph().getSuccessors(js);
-                        for(Operator op : ops ) {
-                            assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                if (execType.equals("mapreduce")) {
+                    if (js.getState().name().equals("FAILED")) {
+                        List<Operator> ops = stats.getJobGraph().getSuccessors(js);
+                        for (Operator op : ops) {
+                            assertEquals(((JobStats) op).getState().toString(), "UNKNOWN");
                         }
                     }
+                } else {
+                    assertEquals(js.getState().name(), "FAILED");
                 }
             }
         } finally {

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServer.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigServer.java Fri May 26 02:01:13 2017
@@ -62,6 +62,7 @@ import org.apache.pig.impl.util.Properti
 import org.apache.pig.impl.util.Utils;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -548,9 +549,7 @@ public class TestPigServer {
         // TODO: Explain XML output is not supported in non-MR mode. Remove the
         // following condition once it's implemented in Tez.
         String execType = cluster.getExecType().toString().toLowerCase();
-        if (!execType.equals(ExecType.MAPREDUCE.name().toLowerCase()) && !execType.equals(MiniGenericCluster.EXECTYPE_SPARK))
{
-            return;
-        }
+        Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType())
|| Util.isSparkExecType(cluster.getExecType()));
         PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag
{ itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
         pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java Fri May 26 02:01:13
2017
@@ -263,6 +263,8 @@ public class TestPigServerLocal {
             // 3 = 1 (registerQuery) + 2 (SortConverter, PigRecordReader)
             // 1 (registerQuery)
             _testSkipParseInRegisterForBatch(true, 3, 1);
+
+            _testParseBatchWithScripting(3, 1);
         } else {
             // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
             // InputSizeReducerEstimator, getSplits->RandomSampleLoader,

Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri May 26 02:01:13
2017
@@ -36,6 +36,7 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -512,9 +513,8 @@ public abstract class TestSecondarySort
     @Test
     // Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827
     public void testCustomPartitionerWithSort() throws Exception {
-        if( Util.isSparkExecType(cluster.getExecType())){
-            return;
-        }
+        Assume.assumeFalse("Skip this test for Spark", Util.isSparkExecType(cluster.getExecType()));
+
         File tmpFile1 = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
         ps1.println("1\t2\t3");

Modified: pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java Fri May 26 02:01:13 2017
@@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtil
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.pig.ExecType;
@@ -79,8 +80,15 @@ public abstract class YarnMiniCluster ex
             m_mr.init(m_dfs_conf);
             m_mr.start();
             m_mr_conf = m_mr.getConfig();
-            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-                    System.getProperty("java.class.path"));
+            File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig"));
+            File classesDir = new File(System.getProperty("build.classes", "build/classes"));
+            File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes"));
+            String classpath = libDir.getAbsolutePath() + "/*"
+                + File.pathSeparator + classesDir.getAbsolutePath()
+                + File.pathSeparator + testClassesDir.getAbsolutePath();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath);
+            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
+            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
 
             Configuration mapred_site = new Configuration(false);
             Configuration yarn_site = new Configuration(false);

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld?rev=1796232&r1=1796231&r2=1796232&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
(original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
Fri May 26 02:01:13 2017
@@ -1,5 +1,5 @@
 #--------------------------------------------------
-# Spark Plan                                  
+# Spark Plan
 #--------------------------------------------------
 
 Spark node scope-18



Mime
View raw message