pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1707145 - in /pig/trunk: ./ conf/ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache/pig/test/data/GoldenFiles/tez/ test/org...
Date Tue, 06 Oct 2015 21:19:35 GMT
Author: rohini
Date: Tue Oct  6 21:19:35 2015
New Revision: 1707145

URL: http://svn.apache.org/viewvc?rev=1707145&view=rev
Log:
PIG-4691: [Pig on Tez] Support for whitelisting storefuncs for union optimization (rohini)

Added:
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/pig-default.properties
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct  6 21:19:35 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4691: [Pig on Tez] Support for whitelisting storefuncs for union optimization (rohini)
+
 PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)
 
 PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Oct  6 21:19:35 2015
@@ -611,6 +611,8 @@ hcat.bin=/usr/local/hcat/bin/hcat
 # output from different vertices into one final output location.
 # If a StoreFunc's OutputCommitter does not work with multiple vertices
 # writing to same location, then you can disable union optimization just
-# for that StoreFunc. Refer PIG-4649
+# for that StoreFunc. Refer PIG-4649. You can also specify a whitelist of StoreFuncs
+# that are known to work with multiple vertices writing to same location instead of a blacklist
 
 #pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
+#pig.tez.opt.union.supported.storefuncs=
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Oct  6 21:19:35 2015
@@ -60,6 +60,13 @@ public class PigConfiguration {
      * This key is used to enable or disable union optimization in tez. True by default
      */
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
+    /**
+     * These keys are used to enable or disable tez union optimization for
+     * specific StoreFuncs so that optimization is only applied to StoreFuncs
+     * that do not hard part file names and honor mapreduce.output.basename and
+     * is turned of for those that do not. Refer PIG-4649
+     */
+    public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = "pig.tez.opt.union.supported.storefuncs";
     public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs";
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Oct
 6 21:19:35 2015
@@ -419,6 +419,12 @@ public class TezLauncher extends Launche
         }
 
         boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
+        List<String> supportedStoreFuncs = null;
+        String unionSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+        if (unionSupported != null && unionSupported.trim().length() > 0) {
+            supportedStoreFuncs = Arrays
+                    .asList(StringUtils.split(unionSupported.trim()));
+        }
         List<String> unionUnsupportedStoreFuncs = null;
         String unionUnSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
         if (unionUnSupported != null && unionUnSupported.trim().length() > 0)
{
@@ -430,7 +436,9 @@ public class TezLauncher extends Launche
         if (isMultiQuery) {
             // reduces the number of TezOpers in the Tez plan generated
             // by multi-query (multi-store) script.
-            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan, isUnionOpt,
unionUnsupportedStoreFuncs);
+            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(
+                    tezPlan, isUnionOpt, supportedStoreFuncs,
+                    unionUnsupportedStoreFuncs);
             mqOptimizer.visit();
         }
 
@@ -443,7 +451,7 @@ public class TezLauncher extends Launche
 
         // Use VertexGroup in Tez
         if (isUnionOpt) {
-            UnionOptimizer uo = new UnionOptimizer(tezPlan, unionUnsupportedStoreFuncs);
+            UnionOptimizer uo = new UnionOptimizer(tezPlan, supportedStoreFuncs, unionUnsupportedStoreFuncs);
             uo.visit();
         }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Tue Oct  6 21:19:35 2015
@@ -40,11 +40,15 @@ import org.apache.pig.impl.plan.VisitorE
 public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
 
     private boolean unionOptimizerOn;
+    private List<String> unionSupportedStoreFuncs;
     private List<String> unionUnsupportedStoreFuncs;
 
-    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn, List<String>
unionUnsupportedStoreFuncs) {
+    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn,
+            List<String> unionSupportedStoreFuncs,
+            List<String> unionUnsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.unionOptimizerOn = unionOptimizerOn;
+        this.unionSupportedStoreFuncs = unionSupportedStoreFuncs;;
         this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
     }
 
@@ -128,8 +132,10 @@ public class MultiQueryOptimizerTez exte
                 if (getPlan().getSuccessors(successor) != null) {
                     for (TezOperator succSuccessor : getPlan().getSuccessors(successor))
{
                         if (succSuccessor.isUnion()) {
-                            if (!(unionOptimizerOn
-                                    && UnionOptimizer.isOptimizable(succSuccessor,
unionUnsupportedStoreFuncs))) {
+                            if (!(unionOptimizerOn &&
+                                    UnionOptimizer.isOptimizable(succSuccessor,
+                                            unionSupportedStoreFuncs,
+                                            unionUnsupportedStoreFuncs))) {
                                 toMergeSuccessors.add(succSuccessor);
                             }
                         } else if (successors.contains(succSuccessor)) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Tue Oct  6 21:19:35 2015
@@ -26,6 +26,9 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -43,7 +46,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.builtin.AvroStorage;
+import org.apache.pig.builtin.JsonStorage;
+import org.apache.pig.builtin.OrcStorage;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
@@ -70,26 +79,57 @@ import org.apache.tez.runtime.library.ou
  */
 public class UnionOptimizer extends TezOpPlanVisitor {
 
+    private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
     private TezOperPlan tezPlan;
+    private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>();
+    private List<String> supportedStoreFuncs;
     private List<String> unsupportedStoreFuncs;
 
-    public UnionOptimizer(TezOperPlan plan, List<String> unsupportedStoreFuncs) {
+    static {
+        builtinSupportedStoreFuncs.add(PigStorage.class.getName());
+        builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
+        builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
+        builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
+        builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
+        builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
+        builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
+        builtinSupportedStoreFuncs.add(Storage.class.getName());
+    }
+
+    public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         tezPlan = plan;
+        this.supportedStoreFuncs = supportedStoreFuncs;
         this.unsupportedStoreFuncs = unsupportedStoreFuncs;
     }
 
-    public static boolean isOptimizable(TezOperator tezOp, List<String> unsupportedStoreFuncs)
+    public static boolean isOptimizable(TezOperator tezOp,
+            List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs)
             throws VisitorException {
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism()
== 1) {
             return false;
         }
-        if (unsupportedStoreFuncs != null) {
+        if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
             List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
             for (POStoreTez store : stores) {
-                if (unsupportedStoreFuncs.contains(store.getStoreFunc().getClass().getName()))
{
+                String name = store.getStoreFunc().getClass().getName();
+                if (unsupportedStoreFuncs != null
+                        && unsupportedStoreFuncs.contains(name)) {
                     return false;
                 }
+                if (supportedStoreFuncs != null
+                        && !supportedStoreFuncs.contains(name)) {
+                    if (!builtinSupportedStoreFuncs.contains(name)) {
+                        LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                                + " does not contain " + name
+                                + " and so disabling union optimization. There will be some
performance degradation. "
+                                + "If your storefunc does not hardcode part file names and
can work with multiple vertices writing to the output location,"
+                                + " run pig with -D"
+                                + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                                + "=<Comma separated list of fully qualified StoreFunc
class names> to enable the optimization. Refer PIG-4691");
+                        return false;
+                    }
+                }
             }
         }
         return true;
@@ -101,7 +141,7 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if (!isOptimizable(tezOp, unsupportedStoreFuncs)) {
+        if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) {
             return;
         }
 

Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Tue Oct  6 21:19:35 2015
@@ -59,4 +59,4 @@ pig.output.committer.recovery.support=fa
 pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
 pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
 
-pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
\ No newline at end of file
+pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld?rev=1707145&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
Tue Oct  6 21:19:35 2015
@@ -0,0 +1,45 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-18	->	Tez vertex scope-20,
+Tez vertex scope-19	->	Tez vertex scope-20,
+Tez vertex scope-20
+
+Tez vertex scope-18
+# Plan on vertex
+POValueOutputTez - scope-22	->	 [scope-20]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+POValueOutputTez - scope-23	->	 [scope-20]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-20
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat)
- scope-17
+|
+|---POShuffledValueInputTez - scope-21	<-	 [scope-18, scope-19]

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Oct  6 21:19:35 2015
@@ -33,9 +33,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
@@ -514,12 +516,24 @@ public class TestTezCompiler {
                 "store c into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
-        String oldConfigValue = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+        String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+        String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
         // Plan should not have union optimization applied
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName());
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName()
+ "();";
+        // Plan should not have union optimization applied
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
         // Restore the value
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldConfigValue);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, oldSupported);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldUnSupported);
     }
 
     @Test



Mime
View raw message