pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1770973 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java test/org/apache/pig/test/TestMultiQuery.java
Date Wed, 23 Nov 2016 14:09:59 GMT
Author: xuefu
Date: Wed Nov 23 14:09:59 2016
New Revision: 1770973

URL: http://svn.apache.org/viewvc?rev=1770973&view=rev
Log:
PIG-4899: The number of records of input file is calculated wrongly in spark mode in multiquery
case (Adam Szita via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1770973&r1=1770972&r2=1770973&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Wed Nov 23 14:09:59 2016
@@ -162,6 +162,10 @@ public class LoadConverter implements RD
         private SparkEngineConf sparkEngineConf;
         private boolean initialized;
 
+        //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing
+        //invalid number of input records, 'skip' flag below indicates first load is finished.
+        private boolean skip;
+
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
 
@@ -172,9 +176,15 @@ public class LoadConverter implements RD
             if (!initialized) {
                 long partitionId = TaskContext.get().partitionId();
                 PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+
+                //We're in POSplit and already counted all input records,
+                //in a multiquery case skip will be set to true after the first load is finished:
+                if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName,
counterName).getValue() > 0) {
+                    skip=true;
+                }
                 initialized = true;
             }
-            if (sparkCounters != null && disableCounter == false) {
+            if (sparkCounters != null && disableCounter == false && skip
== false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return v1._2();

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1770973&r1=1770972&r2=1770973&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Wed Nov 23 14:09:59 2016
@@ -32,6 +32,8 @@ import org.apache.pig.builtin.mock.Stora
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.InputStats;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -880,6 +882,27 @@ public class TestMultiQuery {
         Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
     }
 
+    @Test
+    public void testMultiQueryJiraPig4899() throws Exception {
+        myPig.setBatchOn();
+
+        myPig.registerQuery("a = load 'passwd' "
+                + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,
gid:int);");
+        myPig.registerQuery("b1 = foreach a generate uname;");
+        myPig.registerQuery("b2 = foreach a generate uid;");
+        myPig.registerQuery("store b1 into 'output1';");
+        myPig.registerQuery("store b2 into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            List<InputStats> stats = job.getStatistics().getInputStats();
+            assertEquals(1,stats.size());
+            InputStats stat = stats.get(0);
+            assertEquals("Number of records in passwd file is 14",14,stat.getNumberRecords());
+        }
+    }
+
     // --------------------------------------------------------------------------
     // Helper methods
 



Mime
View raw message