tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-102. Fix shuffle to read the correct comparator class from configuration .
Date Thu, 02 May 2013 23:17:47 GMT
Updated Branches:
  refs/heads/TEZ-1 9f7a21d36 -> 45ace5414


TEZ-102. Fix shuffle to read the correct comparator class from configuration .


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/45ace541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/45ace541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/45ace541

Branch: refs/heads/TEZ-1
Commit: 45ace5414295eb1a586adc2aa3153f5dfecbad0f
Parents: 9f7a21d
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu May 2 16:12:49 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu May 2 16:16:34 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezJobConfig.java   |    5 +-
 .../org/apache/tez/engine/common/ConfigUtils.java  |   41 ++----------
 .../tez/mapreduce/hadoop/DeprecatedKeys.java       |    2 +
 .../hadoop/MultiStageMRConfToTezTranslator.java    |   49 ++++++++++++++-
 .../processor/reduce/ReduceProcessor.java          |    5 +-
 5 files changed, 63 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 72ba68f..e867458 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -263,7 +263,7 @@ public class TezJobConfig {
       "tez.engine.intermediate-output.key.comparator.class";
   public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS = 
       "tez.engine.intermediate-input.key.comparator.class";
-  
+
   public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS = 
       "tez.engine.intermediate-output.key.class";
   public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS = 
@@ -283,6 +283,9 @@ public class TezJobConfig {
       "tez.engine.intermediate-output.compress.codec";
   public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC = 
       "tez.engine.intermediate-input.compress.codec";
+
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS
= 
+      "tez.engine.intermediate-input.key.secondary.comparator.class";
   
   // TODO This should be in DAGConfiguration
   /* config for tracking the local file where all the credentials for the job

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
index e1f496d..f14bb3e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
@@ -19,9 +19,7 @@
 package org.apache.tez.engine.common;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -79,14 +77,10 @@ public class ConfigUtils {
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
   }
 
-  // TODO Is it possible to simplify the 3-level lookup (Comparator, Map-key, Job-key)
   public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf)
{
     Class<V> retv = (Class<V>) conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
         Object.class);
-    if (retv == null) {
-      retv = getOutputValueClass(conf);
-    }
     return retv;
   }
   
@@ -94,24 +88,13 @@ public class ConfigUtils {
     Class<V> retv = (Class<V>) conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null,
         Object.class);
-    if (retv == null) {
-      retv = getOutputValueClass(conf);
-    }
     return retv;
   }
 
-  public static <V> Class<V> getOutputValueClass(Configuration conf) {
-    return (Class<V>) conf.getClass(
-        "mapreduce.job.output.value.class", Text.class, Object.class);
-  }
-
   public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf)
{
     Class<K> retv = (Class<K>) conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
         Object.class);
-    if (retv == null) {
-      retv = getOutputKeyClass(conf);
-    }
     return retv;
   }
 
@@ -119,20 +102,9 @@ public class ConfigUtils {
     Class<K> retv = (Class<K>) conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null,
         Object.class);
-    if (retv == null) {
-      retv = getOutputKeyClass(conf);
-    }
     return retv;
   }
-  
-  public static <K> Class<K> getOutputKeyClass(Configuration conf) {
-    return 
-        (Class<K>) 
-        conf.getClass(
-            "mapreduce.job.output.key.class", 
-            LongWritable.class, Object.class);
-}
-  
+
   public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration
conf) {
     Class<? extends RawComparator> theClass = conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
@@ -155,14 +127,15 @@ public class ConfigUtils {
 
   
   
-  public static <V> RawComparator<V> getOutputValueGroupingComparator(
+  // TODO Fix name
+  public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator(
       Configuration conf) {
-    Class<? extends RawComparator> theClass = 
-        conf.getClass(
-            "mapreduce.job.output.group.comparator.class", 
+    Class<? extends RawComparator> theClass = conf
+        .getClass(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
             null, RawComparator.class);
     if (theClass == null) {
-      return getIntermediateOutputKeyComparator(conf);
+      return getIntermediateInputKeyComparator(conf);
     }
 
     return ReflectionUtils.newInstance(theClass, conf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 9ac723f..3add31c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -194,6 +194,8 @@ public class DeprecatedKeys {
     
     registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
     
+    registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
+    
     registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
 
     registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 162e225..c444314 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -21,11 +21,17 @@ package org.apache.tez.mapreduce.hadoop;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
 
 public class MultiStageMRConfToTezTranslator {
 
+  private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class);
+  
   private enum DeprecationReason {
     DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE
   }
@@ -49,10 +55,10 @@ public class MultiStageMRConfToTezTranslator {
     int numEdges = totalStages - 1;
 
     Configuration[] allConfs = extractStageConfs(newConf, numEdges);
-
+    
     for (int i = 0; i < allConfs.length; i++) {
+      setStageKeysFromBaseConf(allConfs[i], srcConf, Integer.toString(i));
       processDirectConversion(allConfs[i]);
-      // XXX How are the number of reducers being set correctly in YARNRUNNER ?
     }
     for (int i = 0; i < allConfs.length - 1; i++) {
       processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
@@ -146,4 +152,41 @@ public class MultiStageMRConfToTezTranslator {
       }
     }
   }
-}
+
+  /**
+   * Pulls in specific keys from the base configuration, if they are not set at
+   * the stage level. An explicit list of keys is copied over (not all), which
+   * require translation to tez keys.
+   */
+  private static void setStageKeysFromBaseConf(Configuration conf,
+      Configuration baseConf, String stage) {
+    JobConf jobConf = new JobConf(baseConf);
+    // Don't clobber explicit tez config.
+    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS) == null
+        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS) == null)
{
+      // If this is set, but the comparator is not set, and their types differ -
+      // the job will break.
+      if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
+        // Pull tis in from the baseConf
+        conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf
+            .getMapOutputKeyClass().getName());
+        LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
+            + " for stage: " + stage
+            + " based on job level configuration. Value: "
+            + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
+      }
+    }
+    
+    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS) == null
+        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null)
{
+      if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
+        conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
+            .getMapOutputValueClass().getName());
+        LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
+            + " for stage: " + stage
+            + " based on job level configuration. Value: "
+            + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index b74f952..9ae07ee 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -129,8 +129,11 @@ implements Processor {
     
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
     Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
+    LOG.info("Using keyClass: " + keyClass);
+    LOG.info("Using valueClass: " + valueClass);
     RawComparator comparator = 
-        ConfigUtils.getOutputValueGroupingComparator(jobConf);
+        ConfigUtils.getInputKeySecondaryGroupingComparator(jobConf);
+    LOG.info("Using comparator: " + comparator);
 
     reduceInputKeyCounter = 
         reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);


Mime
View raw message