pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1604284 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/
Date Fri, 20 Jun 2014 21:56:19 GMT
Author: rohini
Date: Fri Jun 20 21:56:18 2014
New Revision: 1604284

URL: http://svn.apache.org/r1604284
Log:
PIG-3975: Multiple Scalar reference calls leading to missing records (knoguchi via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1604284&r1=1604283&r2=1604284&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jun 20 21:56:18 2014
@@ -211,6 +211,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3975: Multiple Scalar reference calls leading to missing records (knoguchi via rohini)
+
 PIG-4017: NPE thrown from JobControlCompiler.shipToHdfs (cheolsoo)
 
 PIG-3997: Issue on Pig docs: Testing and Diagnostics (zjffdu via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1604284&r1=1604283&r2=1604284&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Jun 20 21:56:18 2014
@@ -234,39 +234,44 @@ public class MRCompiler extends PhyPlanV
             ConfigurationUtil.toConfiguration(pigContext.getProperties());
         boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
 
-        Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>();
+        Set<FileSpec> seen = new HashSet<FileSpec>();
 
-        for(MapReduceOper mrOp: mrOpList) {
-            for(PhysicalOperator scalar: mrOp.scalars) {
-                MapReduceOper mro = phyToMROpMap.get(scalar);
+        for(MapReduceOper mro_scalar_consumer: mrOpList) {
+            for(PhysicalOperator scalar: mro_scalar_consumer.scalars) {
+                MapReduceOper mro_scalar_producer = phyToMROpMap.get(scalar);
                 if (scalar instanceof POStore) {
                     FileSpec oldSpec = ((POStore)scalar).getSFile();
-                    MapReduceOper mro2 = seen.get(oldSpec);
-                    boolean hasSeen = false;
-                    if (mro2 != null) {
-                        hasSeen = true;
-                        mro = mro2;
+                    if( seen.contains(oldSpec) ) {
+                      continue;
                     }
-                    if (!hasSeen
-                            && combinable
-                            && (mro.reducePlan.isEmpty() ? hasTooManyInputFiles(mro,
conf)
-                                    : (mro.requestedParallelism >= fileConcatenationThreshold)))
{
-                        PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan;
+                    seen.add(oldSpec);
+                    if ( combinable
+                         && (mro_scalar_producer.reducePlan.isEmpty() ?
+                              hasTooManyInputFiles(mro_scalar_producer, conf)
+                              : (mro_scalar_producer.requestedParallelism >= fileConcatenationThreshold)))
{
+                        PhysicalPlan pl = mro_scalar_producer.reducePlan.isEmpty() ?
+                                            mro_scalar_producer.mapPlan : mro_scalar_producer.reducePlan;
                         FileSpec newSpec = getTempFileSpec();
 
                         // replace oldSpec in mro with newSpec
                         new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
+                        seen.add(newSpec);
 
                         POStore newSto = getStore();
                         newSto.setSFile(oldSpec);
-                        if (MRPlan.getPredecessors(mrOp)!=null &&
-                                MRPlan.getPredecessors(mrOp).contains(mro))
-                            MRPlan.disconnect(mro, mrOp);
-                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto);
-                        MRPlan.connect(catMROp, mrOp);
-                        seen.put(oldSpec, catMROp);
-                    } else {
-                        if (!hasSeen) seen.put(oldSpec, mro);
+                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro_scalar_producer,
newSto);
+                        MRPlan.connect(mro_scalar_producer, catMROp);
+
+                        // Need to add it to the PhysicalPlan and phyToMROpMap
+                        // so that softlink can be created
+                        phyToMROpMap.put(newSto, catMROp);
+                        plan.add(newSto);
+
+                        for (PhysicalOperator succ :
+                                plan.getSoftLinkSuccessors(scalar).toArray(new PhysicalOperator[0]))
{
+                            plan.createSoftLink(newSto, succ);
+                            plan.removeSoftLink(scalar, succ);
+                        }
                     }
                 }
             }
@@ -335,8 +340,6 @@ public class MRCompiler extends PhyPlanV
             compile(op);
         }
 
-        connectSoftLink();
-
         return MRPlan;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1604284&r1=1604283&r2=1604284&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Jun 20 21:56:18 2014
@@ -631,6 +631,7 @@ public class MapReduceLauncher extends L
         MRCompiler comp = new MRCompiler(php, pc);
         comp.compile();
         comp.aggregateScalarsFiles();
+        comp.connectSoftLink();
         MROperPlan plan = comp.getMRPlan();
 
         //display the warning message(s) from the MRCompiler

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1604284&r1=1604283&r2=1604284&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Fri Jun 20 21:56:18 2014
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.Iterator;
@@ -28,6 +29,8 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -432,4 +435,79 @@ public class TestFRJoin2 {
             pigServer.openIterator("D");
         }
     }
+
+    // pig-3975 test scalar alias with file concatenation referenced
+    // by multiple mapreduce jobs
+    @Test
+    public void testSoftLinkDependencyWithMultipleScalarReferences()
+                  throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+                                      cluster.getProperties());
+
+        pigServer.setBatchOn();
+        pigServer.getPigContext().getProperties().setProperty(
+                  MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "false");
+        String query = "A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";"
+                       + "C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "D = FOREACH C generate B.$0;"
+                       + "STORE D into '/tmp/output1';"
+                       + "E = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "F = FOREACH E generate B.$0;"
+                       + "STORE F into '/tmp/output2';";
+        MROperPlan mrplan = Util.buildMRPlanWithOptimizer(Util.buildPp(pigServer, query),pigServer.getPigContext());
+        assertEquals("Unexpected number of mapreduce job. Missing concat job?",
+                     4, mrplan.size() );
+
+        // look for concat job
+        MapReduceOper concatMRop = null;
+        for(MapReduceOper mrOp: mrplan) {
+            //concatjob == map-plan load-store && reudce-plan empty
+            if( mrOp.mapPlan.size() == 2 && mrOp.reducePlan.isEmpty() ) {
+                concatMRop = mrOp;
+                break;
+            }
+        }
+
+        if( concatMRop == null ) {
+            fail("Cannot find concat job.");
+        }
+        // 2 mr job reads from the concat job result [B.$0] so there
+        // should be 2 mr jobs as successors of the concat job
+        assertEquals("Missing dependency for concatjob",
+                     2, mrplan.getSuccessors(concatMRop).size());
+
+    }
+
+    // Extra scalar reference should not cause concat job to be created
+    @Test
+    public void testSoftLinkDoesNotCreateUnnecessaryConcatJob()
+                  throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+                                      cluster.getProperties());
+
+        pigServer.setBatchOn();
+        pigServer.getPigContext().getProperties().setProperty(
+                  MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "false");
+        String query = "A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "B = group A all;"
+                       + "C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "D = group C by x;"
+                       + "E = group D all;"
+                       + "F = FOREACH E generate B.$0;"
+                       + "Z = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"
+                       + "Y = FOREACH E generate F.$0;"
+                       + "STORE Y into '/tmp/output2';";
+        MROperPlan mrplan = Util.buildMRPlanWithOptimizer(Util.buildPp(pigServer, query),pigServer.getPigContext());
+
+        // look for concat job
+        for(MapReduceOper mrOp: mrplan) {
+            //concatjob == map-plan load-store && reudce-plan empty
+            if( mrOp.mapPlan.size() == 2 && mrOp.reducePlan.isEmpty() ) {
+                fail("Somehow concatjob was created even though there is no large or multiple
inputs.");
+            }
+        }
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1604284&r1=1604283&r2=1604284&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Fri Jun 20 21:56:18 2014
@@ -853,6 +853,8 @@ public class Util {
     public static MROperPlan buildMRPlan(PhysicalPlan pp, PigContext pc) throws Exception{
         MRCompiler comp = new MRCompiler(pp, pc);
         comp.compile();
+        comp.aggregateScalarsFiles();
+        comp.connectSoftLink();
         return comp.getMRPlan();
     }
 



Mime
View raw message