pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1713571 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache/pig/test/ test/org/apache/pig/tez/
Date Tue, 10 Nov 2015 00:03:30 GMT
Author: rohini
Date: Tue Nov 10 00:03:30 2015
New Revision: 1713571

URL: http://svn.apache.org/viewvc?rev=1713571&view=rev
Log:
PIG-4730: [Pig on Tez] Total parallelism estimation does not account load parallelism (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/trunk/test/org/apache/pig/test/Util.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
    pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 10 00:03:30 2015
@@ -69,6 +69,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4730: [Pig on Tez] Total parallelism estimation does not account load parallelism (rohini)
+
 PIG-4689: CSV Writes incorrect header if two CSV files are created in one script (nielsbasjes
via daijy)
 
 PIG-4727: Incorrect types table for AVG in docs (nsmith via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue
Nov 10 00:03:30 2015
@@ -103,7 +103,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -164,7 +163,7 @@ import org.apache.tez.runtime.library.in
  * A visitor to construct DAG out of Tez plan.
  */
 public class TezDagBuilder extends TezOpPlanVisitor {
-    private static final Log log = LogFactory.getLog(TezJobCompiler.class);
+    private static final Log log = LogFactory.getLog(TezDagBuilder.class);
 
     private DAG dag;
     private Map<String, LocalResource> localResources;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Tue
Nov 10 00:03:30 2015
@@ -23,7 +23,6 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -111,6 +110,7 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
             return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
Tue Nov 10 00:03:30 2015
@@ -81,6 +81,7 @@ public class ParallelismSetter extends T
                 // requestedParallelism of Loader vertex is handled in LoaderProcessor
                 // propogate to vertexParallelism
                 tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+                incrementTotalParallelism(tezOp, tezOp.getRequestedParallelism());
                 return;
             } else {
                 int prevParallelism = -1;

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue Nov 10 00:03:30 2015
@@ -1382,19 +1382,21 @@ public class Util {
     }
 
     public static void createLogAppender(String appenderName, Writer writer, Class...clazzes)
{
+        WriterAppender writerAppender = new WriterAppender(new PatternLayout("%d [%t] %-5p
%c %x - %m%n"), writer);
+        writerAppender.setName(appenderName);
         for (Class clazz : clazzes) {
             Logger logger = Logger.getLogger(clazz);
-            WriterAppender writerAppender = new WriterAppender(new PatternLayout("%d [%t]
%-5p %c %x - %m%n"), writer);
-            writerAppender.setName(appenderName);
             logger.addAppender(writerAppender);
         }
     }
 
-    public static void removeLogAppender(Class clazz, String appenderName) {
-        Logger logger = Logger.getLogger(clazz);
-        Appender appender = logger.getAppender(appenderName);
-        appender.close();
-        logger.removeAppender(appenderName);
+    public static void removeLogAppender(String appenderName, Class...clazzes) {
+        for (Class clazz : clazzes) {
+            Logger logger = Logger.getLogger(clazz);
+            Appender appender = logger.getAppender(appenderName);
+            appender.close();
+            logger.removeAppender(appenderName);
+        }
     }
 
     public static Path getFirstPartFile(Path path) throws Exception {

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Tue Nov 10 00:03:30 2015
@@ -40,6 +40,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -311,6 +312,7 @@ public class TestTezAutoParallelism {
         // Parallelism of C should be increased
         assertTrue(log.contains("Increased requested parallelism of scope-59 to 4"));
         assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism"));
+        assertTrue(log.contains("Total estimated parallelism is 52"));
     }
 
     @Test
@@ -349,6 +351,7 @@ public class TestTezAutoParallelism {
             // Parallelism of C1 should be increased. C2 will not be increased due to order
by
             assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism"));
             assertTrue(log.contains("Increased requested parallelism of scope-65 to 10"));
+            assertTrue(log.contains("Total estimated parallelism is 19"));
         } finally {
             pigServer.setDefaultParallel(-1);
         }
@@ -359,7 +362,7 @@ public class TestTezAutoParallelism {
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified parallelism is overriden
-        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, ParallelismSetter.class);
+        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, ParallelismSetter.class,
TezJobCompiler.class);
         try {
             pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
             pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"4000");
@@ -387,7 +390,7 @@ public class TestTezAutoParallelism {
             }
             return writer.toString();
         } finally {
-            Util.removeLogAppender(ParallelismSetter.class, "testIncreaseIntermediateParallelism");
+            Util.removeLogAppender("testIncreaseIntermediateParallelism", ParallelismSetter.class,
TezJobCompiler.class);
             Util.deleteFile(cluster, outputDir);
         }
     }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Tue Nov 10 00:03:30 2015
@@ -145,8 +145,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52
to 1 from 20"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61
to 1 from 100"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testDecreaseParallelism");
-            Util.removeLogAppender(ShuffleVertexManager.class, "testDecreaseParallelism");
+            Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class,
ShuffleVertexManager.class);
         }
     }
 
@@ -186,8 +185,7 @@ public class TestTezGraceParallelism {
             // There are randomness in which task finishes first, so the auto parallelism
could result different result
             assertTrue(Pattern.compile("Reduce auto parallelism for vertex: scope-64 to (\\d+)*
from 50").matcher(writer.toString()).find());
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testIncreaseParallelism");
-            Util.removeLogAppender(ShuffleVertexManager.class, "testIncreaseParallelism");
+            Util.removeLogAppender("testIncreaseParallelism", PigGraceShuffleVertexManager.class,
ShuffleVertexManager.class);
         }
     }
 
@@ -222,7 +220,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("All predecessors for scope-84 are finished,
time to set parallelism for scope-85"));
             assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to
101"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testJoinWithDifferentDepth");
+            Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
         }
     }
 
@@ -252,7 +250,7 @@ public class TestTezGraceParallelism {
             assertEquals(count, 1000);
             assertFalse(writer.toString().contains("scope-68"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testJoinWithDifferentDepth2");
+            Util.removeLogAppender("testJoinWithDifferentDepth2", PigGraceShuffleVertexManager.class);
         }
     }
 
@@ -285,7 +283,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("time to set parallelism for scope-41"));
             assertTrue(writer.toString().contains("time to set parallelism for scope-54"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testJoinWithUnion");
+            Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class);
         }
     }
 



Mime
View raw message