pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1725329 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/impl/util/ src/org/apache/pig/tools/pigstats/tez/ test/e2e/pig/tests/
Date Mon, 18 Jan 2016 19:47:48 GMT
Author: rohini
Date: Mon Jan 18 19:47:48 2016
New Revision: 1725329

URL: http://svn.apache.org/viewvc?rev=1725329&view=rev
Log:
PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 18 19:47:48 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records (rohini)
+
 PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg (rohini)
 
 PIG-4737: Check and fix clone implementation for all classes extending PhysicalOperator (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Mon Jan 18 19:47:48 2016
@@ -1945,8 +1945,13 @@ public class MRCompiler extends PhyPlanV
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+                    // Add an empty bag for outer join.
+                    if (i == 0) {
+                        // For right outer, add IsFirstReduceOfKey UDF as well
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
+                    }
                 }
                 flat.add(true);
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Mon Jan 18 19:47:48 2016
@@ -1682,7 +1682,7 @@ public class TezCompiler extends PhyPlan
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             List<Boolean> flat = new ArrayList<Boolean>();
 
-            boolean containsOuter = false;
+            boolean containsRightOuter = false;
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
@@ -1693,9 +1693,13 @@ public class TezCompiler extends PhyPlan
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
-                    containsOuter = true;
+                    // Add an empty bag for outer join. For right outer, add IsFirstReduceOfKeyTez
UDF as well
+                    if (i == 0) {
+                        containsRightOuter = true;
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
+                    }
                 }
                 flat.add(true);
             }
@@ -1714,7 +1718,7 @@ public class TezCompiler extends PhyPlan
 
             POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
             for (int i = 0; i <= 2; i++) {
-                if (i != 2 || containsOuter) {
+                if (i != 2 || containsRightOuter) {
                     // We need to send sample to left relation partitioner vertex, right
relation load vertex,
                     // and join vertex (IsFirstReduceOfKey in join vertex need sample file
as well)
                     joinJobs[i].setSampleOperator(sampleJobPair.first);

Modified: pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Jan 18 19:47:48 2016
@@ -42,43 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 
-/* 
+/*
  * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler
- * 
+ *
  */
 public class CompilerUtils {
 
     public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema,
-            boolean skewedJoin, String isFirstReduceOfKeyClassName) throws PlanException
{
+            boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName) throws PlanException
{
         // we currently have POProject[bag] as the only operator in the plan
         // If the bag is an empty bag, we should replace
         // it with a bag with one tuple with null fields so that when we flatten
         // we do not drop records (flatten will drop records if the bag is left
-        // as an empty bag) and actually project nulls for the fields in 
+        // as an empty bag) and actually project nulls for the fields in
         // the empty bag
-        
+
         // So we need to get to the following state:
         // POProject[Bag]
-        //         \     
-        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
+        //         \
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
         //                         \     |    /
         //                          POBinCond
-        // Further, if it is skewed join, only the first reduce of the key
+        // Further, if it is skewed right outer join, only the first reduce of the key
         // will generate tuple with null fields (See PIG-4377)
-        // 
+        //
         // POProject[key]              POProject[Bag]
         //         \                      /
         //      IsFirstReduceOfKey  POUserFunc["IsEmpty()"]
         //                   \        /
         //                    \      /
-        //                       AND  Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
+        //                       AND  Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
         //                         \     |    /
         //                          POBinCond
         POProject relationProject = (POProject) fePlan.getRoots().get(0);
         try {
-            
+
             // condition of the bincond
             POProject relationProjectForIsEmpty = relationProject.clone();
             fePlan.add(relationProjectForIsEmpty);
@@ -92,7 +92,7 @@ public class CompilerUtils {
             fePlan.connect(relationProjectForIsEmpty, isEmpty);
 
             ExpressionOperator cond;
-            if (skewedJoin) {
+            if (skewedRightOuterJoin) {
                 POProject projectForKey = new POProject(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 projectForKey.setColumn(0);
                 projectForKey.setOverloaded(false);
@@ -119,7 +119,7 @@ public class CompilerUtils {
             } else {
                 cond = isEmpty;
             }
-            
+
             // lhs of bincond (const bag with null fields)
             ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
@@ -136,7 +136,7 @@ public class CompilerUtils {
             ce.setResultType(DataType.BAG);
             //this operator doesn't have any predecessors
             fePlan.add(ce);
-            
+
             //rhs of bincond is the original project
             // let's set up the bincond now
             POBinCond bincond = new POBinCond(new OperatorKey(scope,
@@ -154,7 +154,7 @@ public class CompilerUtils {
         } catch (Exception e) {
             throw new PlanException("Error setting up outerjoin", e);
         }
-    	
+
     }
 
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Mon Jan 18 19:47:48 2016
@@ -71,14 +71,14 @@ public class TezDAGStats extends JobStat
     public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
 
     public static final String SUCCESS_HEADER = String.format("VertexId Parallelism TotalTasks"
-            + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+            + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
             + " Alias\tFeature\tOutputs",
-            "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead",
"HdfsBytesWritten");
+            "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten",
"HdfsBytesRead", "HdfsBytesWritten");
 
     public static final String FAILURE_HEADER = String.format("VertexId  State Parallelism
TotalTasks"
-            + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+            + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
             + " Alias\tFeature\tOutputs",
-            "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead",
"HdfsBytesWritten");
+            "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten",
"HdfsBytesRead", "HdfsBytesWritten");
 
     private Map<String, TezVertexStats> tezVertexStatsMap;
 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Mon Jan 18 19:47:48
2016
@@ -73,6 +73,7 @@ public class TezVertexStats extends JobS
 
     private int numTasks = 0;
     private long numInputRecords = 0;
+    private long numReduceInputRecords = 0;
     private long numOutputRecords = 0;
     private long fileBytesRead = 0;
     private long fileBytesWritten = 0;
@@ -111,6 +112,7 @@ public class TezVertexStats extends JobS
         sb.append(String.format("%9s ", parallelism));
         sb.append(String.format("%10s ", numTasks));
         sb.append(String.format("%14s ", numInputRecords));
+        sb.append(String.format("%20s ", numReduceInputRecords));
         sb.append(String.format("%14s ", numOutputRecords));
         sb.append(String.format("%14s ", fileBytesRead));
         sb.append(String.format("%16s ", fileBytesWritten));
@@ -213,6 +215,19 @@ public class TezVertexStats extends JobS
     }
 
     public void addInputStatistics() {
+
+        long inputRecords = -1;
+        Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+        if (taskCounters != null) {
+            if (taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+                inputRecords = taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+                numInputRecords = inputRecords;
+            }
+            if (taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()) != null) {
+                numReduceInputRecords = taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name());
+            }
+        }
+
         if (loads == null) {
             return;
         }
@@ -233,17 +248,12 @@ public class TezVertexStats extends JobS
                     if (n != null) records = n;
                 }
                 if (records == -1) {
-                    Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
-                    if (taskCounters != null
-                            && taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name())
!= null) {
-                        records = taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
-                    }
+                    records = inputRecords;
                 }
                 if (isSuccessful() && records == -1) {
                     // Tez removes 0 value counters for efficiency.
                     records = 0;
                 }
-                numInputRecords = records;
                 if (counters.get(FS_COUNTER_GROUP) != null &&
                         counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ)
!= null) {
                     hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
@@ -257,6 +267,16 @@ public class TezVertexStats extends JobS
     }
 
     public void addOutputStatistics() {
+
+        long outputRecords = -1;
+
+        Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+        if (taskCounters != null
+                && taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+            outputRecords = taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
+            numOutputRecords = outputRecords;
+        }
+
         if (stores == null) {
             return;
         }
@@ -279,19 +299,12 @@ public class TezVertexStats extends JobS
                     if (n != null) records = n;
                 }
                 if (records == -1) {
-                    Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
-                    if (taskCounters != null
-                            && taskCounters.get(TaskCounter.OUTPUT_RECORDS.name())
!= null) {
-                        records = taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
-                    }
+                    records = outputRecords;
                 }
                 if (isSuccessful() && records == -1) {
                     // Tez removes 0 value counters for efficiency.
                     records = 0;
                 }
-                if (records != -1) {
-                    numOutputRecords += records;
-                }
             }
             /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
             if (!sto.isMultiStore() && counters.get(FS_COUNTER_GROUP)!= null &&

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jan 18 19:47:48 2016
@@ -3125,6 +3125,37 @@ c = foreach b generate flatten($0);
 store c into ':OUTPATH:';\,
 
                         },
+                        # left outer join with fixed memory
+                        {
+                        'num' => 13,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using
PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k'
using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name ;
+store e into ':OUTPATH:';\,
+                        },
+                        # full outer join with fixed memory
+                        {
+                        'num' => 14,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using
PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k'
using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name ;
+store e into ':OUTPATH:';\,
+
+                        },
                 ]
 
             },



Mime
View raw message