pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1723975 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Date Mon, 11 Jan 2016 02:54:54 GMT
Author: rohini
Date: Mon Jan 11 02:54:54 2016
New Revision: 1723975

URL: http://svn.apache.org/viewvc?rev=1723975&view=rev
Log:
PIG-4775: Better default values for shuffle bytes per reducer (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 02:54:54 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4775: Better default values for shuffle bytes per reducer (rohini)
+
 PIG-4753: Pigmix should have option to delete outputs after completing the tests (mitdesai
via rohini)
 
 PIG-4744: Honor tez.staging-dir setting in tez-site.xml (rohini via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon
Jan 11 02:54:54 2016
@@ -165,6 +165,9 @@ import org.apache.tez.runtime.library.in
 public class TezDagBuilder extends TezOpPlanVisitor {
     private static final Log log = LogFactory.getLog(TezDagBuilder.class);
 
+    private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 * 1024L;
+    private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
+
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
@@ -705,17 +708,25 @@ public class TezDagBuilder extends TezOp
                         vmPluginName = ShuffleVertexManager.class.getName();
                     }
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
-                    if (stores.size() <= 0) {
-                        // Intermediate reduce. Set the bytes per reducer to be block size.
-                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                        intermediateTaskInputSize);
-                    } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)
!=
-                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)
{
-                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                        InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+                    // For Intermediate reduce, set the bytes per reducer to be block size.
+                    long bytesPerReducer = intermediateTaskInputSize;
+                    // If there are store statements, use BYTES_PER_REDUCER_PARAM configured
by user.
+                    // If not as default use 384MB for group bys and 256 MB for joins. Not
using
+                    // default 1G as that value was suited for mapreduce logic where numReducers=(map
input size/bytesPerReducer).
+                    // In Tez, numReducers=(map output size/bytesPerReducer) we need lower
values to avoid skews in reduce
+                    // as map input sizes are mostly always high compared to map output.
+                    if (stores.size() > 0) {
+                        if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM)
!= null) {
+                            bytesPerReducer = vmPluginConf.getLong(
+                                            InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+                        } else if (tezOp.isGroupBy()) {
+                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+                        } else {
+                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+                        }
                     }
+                    vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
bytesPerReducer);
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }



Mime
View raw message