pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1706921 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java test/org/apache/pig/test/TestLimitVariable.java
Date Mon, 05 Oct 2015 20:55:51 GMT
Author: rohini
Date: Mon Oct  5 20:55:51 2015
New Revision: 1706921

URL: http://svn.apache.org/viewvc?rev=1706921&view=rev
Log:
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/trunk/test/org/apache/pig/test/TestLimitVariable.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct  5 20:55:51 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
+
 PIG-4635: NPE while running pig script in tez mode (daijy)
 
 PIG-4683: Nested order is broken after PIG-3591 in some cases (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Mon Oct  5 20:55:51 2015
@@ -34,6 +34,7 @@ import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -197,6 +198,18 @@ public class PigProcessor extends Abstra
 
             runPipeline(leaf);
 
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))
+                    && !execPlan.endOfAllInput) {
+                // If there is a stream in the pipeline or if this map job belongs to merge-join
we could
+                // potentially have more to process - so lets
+                // set the flag stating that all map input has been sent
+                // already and then lets run the pipeline one more time
+                // This will result in nothing happening in the case
+                // where there is no stream or it is not a merge-join in the pipeline
+                execPlan.endOfAllInput = true;
+                runPipeline(leaf);
+            }
+
             // Calling EvalFunc.finish()
             UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan,
                     new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(

Modified: pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLimitVariable.java?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLimitVariable.java Mon Oct  5 20:55:51 2015
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -64,20 +65,24 @@ public class TestLimitVariable {
 
     @Test
     public void testLimitVariable1() throws IOException {
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"" + true);
         String query =
-            "a = load '" + inputFile.getName() + "';" +
+            "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" +
             "b = group a all;" +
             "c = foreach b generate COUNT(a) as sum;" +
             "d = order a by $0 DESC;" +
-            "e = limit d c.sum/2;" // return top half of the tuples
+            "e = limit d c.sum/2;" + // return top half of the tuples
+            "f = group e all;" +
+            "g = foreach f generate AVG(e.$0), SUM(e.$1);"
             ;
 
         Util.registerMultiLineQuery(pigServer, query);
-        Iterator<Tuple> it = pigServer.openIterator("e");
+        Iterator<Tuple> it = pigServer.openIterator("g");
 
         List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[]
{
-                "(6,15)", "(5,10)", "(4,11)" });
+                "(5.0,36)"});
         Util.checkQueryOutputs(it, expectedRes);
+        pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
     }
 
     @Test



Mime
View raw message