tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-191. Add helper APIs to convert a MR task configuration to Tez Vertex, Input, Output configuration. (sseth)
Date Mon, 10 Jun 2013 04:34:10 GMT
TEZ-191. Add helper APIs to convert a MR task configuration to Tez
Vertex, Input, Output configuration. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/81f43ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/81f43ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/81f43ef9

Branch: refs/heads/master
Commit: 81f43ef93d1350ad7d4d94e71b722862b19602c2
Parents: 3780729
Author: Siddharth Seth <sseth@apache.org>
Authored: Sun Jun 9 21:29:56 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Sun Jun 9 21:29:56 2013 -0700

----------------------------------------------------------------------
 .../hadoop/MultiStageMRConfToTezTranslator.java | 148 ++++++++++++++---
 .../hadoop/TestConfigTranslationMRToTez.java    | 161 +++++++++++++++++++
 2 files changed, 291 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/81f43ef9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 54648f5..e43bfa7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -23,11 +23,15 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
 
+import com.google.common.base.Preconditions;
+
 public class MultiStageMRConfToTezTranslator {
 
   private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class);
@@ -42,6 +46,16 @@ public class MultiStageMRConfToTezTranslator {
   // as well to verify functionality.
   //
 
+  /**
+   * Converts a single completely configured MR* job to something that can be
+   * understood by the Tez MR runtime.
+   * 
+   * @param srcConf
+   *          the configuration for the entire MR* job. Configs for the first
+   *          and last stage are expected to be set at root level. Configs for
+   *          intermediate stages will be prefixed with the stage number.
+   * @return A translated MR* config with keys translated over to Tez.
+   */
   // TODO Set the cause properly.
   public static Configuration convertMRToLinearTez(Configuration srcConf) {
     Configuration newConf = new Configuration(srcConf);
@@ -61,12 +75,12 @@ public class MultiStageMRConfToTezTranslator {
       processDirectConversion(allConfs[i]);
     }
     for (int i = 0; i < allConfs.length - 1; i++) {
-      processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
+      translateMultiStageWithSuccessor(allConfs[i], allConfs[i + 1]);
 
     }
     // Unset unnecessary keys in the last stage. Will end up being called for
     // single stage as well which should be harmless.
-    processMultiStageDepreaction(allConfs[allConfs.length - 1], null);
+    translateMultiStageWithSuccessor(allConfs[allConfs.length - 1], null);
 
     for (int i = 0; i < allConfs.length; i++) {
       String vertexName;
@@ -85,6 +99,78 @@ public class MultiStageMRConfToTezTranslator {
     return newConf;
   }
 
+  
+
+  /**
+   * Translates MR keys to Tez for the provided vertex conf. The conversion is
+   * done in place.
+   * 
+   * This method should be called for each stage config of the MR* job. The call
+   * for the first vertex should set the predecessorConf as null.
+   * 
+   * Since there's no separation of input / output params at the moment, the
+   * config generated by this can be set as the configuration for Tez
+   * Input/Output and Processor.
+   * 
+   * @param conf
+   *          Configuration for the vertex being configured.
+   * @param predecessorConf
+   *          Configuration for the previous vertex in the MR* chain
+   */
+  @LimitedPrivate("Hive, Pig")
+  @Unstable
+  public static void translateVertexConfToTez(Configuration conf,
+      Configuration predecessorConf) {
+    convertVertexConfToTez(conf, predecessorConf);
+  }
+
+  /**
+   * Given a source and destination vertex, returns the config which should be
+   * used for the Output on this edge. The configs must be configured with tez
+   * keys - or run through translateVertexConfToTez.
+   * 
+   * @param srcVertex
+   *          The tez configuration for the source vertex.
+   * @param destVertex
+   *          The tez configuration for the destination vertex.
+   * @return
+   */
+  @LimitedPrivate("Hive, Pig")
+  @Unstable
+  public static Configuration getOutputConfOnEdge(Configuration srcVertex,
+      Configuration destVertex) {
+    Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge");
+    Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge");
+    return srcVertex;
+  }
+
+  /**
+   * Given a source and destination vertex, returns the config which should be
+   * used for the Input on this edge. The configs must be configured with tez
+   * keys - or run through translateVertexConfToTez.
+   * 
+   * @param srcVertex
+   *          The tez configuration for the source vertex.
+   * @param destVertex
+   *          The tez configuration for the destination vertex.
+   * @return
+   */
+  @LimitedPrivate("Hive, Pig")
+  @Unstable
+  public static Configuration getInputConfOnEdge(Configuration srcVertex,
+      Configuration destVertex) {
+    Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge");
+    Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge");
+    return destVertex;
+  }
+
+  private static void convertVertexConfToTez(Configuration vertexConf,
+      Configuration predecessorConf) {
+    setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
+    processDirectConversion(vertexConf);
+    translateMultiStageWithPredecessor(vertexConf, predecessorConf);
+  }
+
   /**
    * Constructs a list containing individual configuration for each stage of the
    * linear MR job, including the first map and last reduce if applicable.
@@ -129,25 +215,55 @@ public class MultiStageMRConfToTezTranslator {
     }
   }
 
-  private static void processMultiStageDepreaction(Configuration srcConf,
-      Configuration destConf) {
+  /**
+   * Takes as parameters configurations for the vertex and it's predecessor
+   * (already translated to Tez). Modifies the vertex conf in place.
+   */
+  private static void translateMultiStageWithPredecessor(
+      Configuration vertexConf, Configuration predecessorConf) {
+    Preconditions.checkNotNull(vertexConf,
+        "Configuration for vertex being translated cannot be null");
+    for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
+        .getMultiStageParamMap().entrySet()) {
+      if (vertexConf.get(dep.getKey()) != null) {
+        String value = vertexConf.get(dep.getKey());
+        vertexConf.unset(dep.getKey());
+        vertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
+            DeprecationReason.DEPRECATED_MULTI_STAGE.name());
+      }
+      // Set keys from the predecessor conf.
+      if (predecessorConf != null) {
+        String expPredecessorKey = dep.getValue().get(MultiStageKeys.OUTPUT);
+        if (predecessorConf.get(expPredecessorKey) != null) {
+          String value = predecessorConf.get(expPredecessorKey);
+          vertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value);
+        }
+      }
+    }
+  }
 
+  /**
+   * Takes as parameters configurations for the vertex and it's successor.
+   * Modifies both in place.
+   */
+  private static void translateMultiStageWithSuccessor(Configuration srcVertexConf,
+      Configuration destVertexConf) {
     // All MR keys which need such translation are specified at src - hence,
     // this is ok.
     // No key exists in which the map is inferring something based on the reduce
     // value.
     for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
         .getMultiStageParamMap().entrySet()) {
-      if (srcConf.get(dep.getKey()) != null) {
-        if (destConf != null) {
-          String value = srcConf.get(dep.getKey());
-          srcConf.unset(dep.getKey());
-          srcConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
+      if (srcVertexConf.get(dep.getKey()) != null) {
+        if (destVertexConf != null) {
+          String value = srcVertexConf.get(dep.getKey());
+          srcVertexConf.unset(dep.getKey());
+          srcVertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
               DeprecationReason.DEPRECATED_MULTI_STAGE.name());
-          destConf.set(dep.getValue().get(MultiStageKeys.INPUT), value,
+          destVertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value,
               DeprecationReason.DEPRECATED_MULTI_STAGE.name());
         } else { // Last stage. Just remove the key reference.
-          srcConf.unset(dep.getKey());
+          srcVertexConf.unset(dep.getKey());
         }
       }
     }
@@ -162,13 +278,11 @@ public class MultiStageMRConfToTezTranslator {
       Configuration baseConf, String stage) {
     JobConf jobConf = new JobConf(baseConf);
     // Don't clobber explicit tez config.
-    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS) == null
-        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS)
-        == null) {
+    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) {
       // If this is set, but the comparator is not set, and their types differ -
       // the job will break.
       if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
-        // Pull tis in from the baseConf
+        // Pull this in from the baseConf
         conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf
             .getMapOutputKeyClass().getName());
         if (LOG.isDebugEnabled()) {
@@ -180,9 +294,7 @@ public class MultiStageMRConfToTezTranslator {
       }
     }
 
-    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS) == null
-        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS)
-        == null) {
+    if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) {
       if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
         conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
             .getMapOutputValueClass().getName());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/81f43ef9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
new file mode 100644
index 0000000..b75f01e
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.junit.Test;
+
+public class TestConfigTranslationMRToTez {
+
+  @Test
+  // Tests derived keys - i.e. the actual key is not set, but the value is 
+  // derived from a fallback key.
+  public void testComplexKeys() {
+
+    JobConf confVertex1 = new JobConf();
+    JobConf confVertex2 = new JobConf();
+    
+    confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, IntWritable.class.getName());
+    confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, ByteWritable.class.getName());
+    
+    confVertex1.unset(MRJobConfig.KEY_COMPARATOR);
+    confVertex1.unset(MRJobConfig.GROUP_COMPARATOR_CLASS);
+    confVertex2.unset(MRJobConfig.KEY_COMPARATOR);
+    confVertex2.unset(MRJobConfig.GROUP_COMPARATOR_CLASS);
+    
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2,
+        confVertex1);
+    
+    assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils
+        .getIntermediateOutputKeyComparator(confVertex1).getClass().getName());
+    assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils
+        .getIntermediateInputKeyComparator(confVertex2).getClass().getName());
+  }
+
+  @Test
+  public void testMultiStageConversion() {
+    JobConf confVertex1 = new JobConf();
+    confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    confVertex1.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    confVertex1.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+
+    JobConf confVertex2 = new JobConf();
+    confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        LongWritable.class.getName());
+    confVertex2.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        LongWritable.class.getName());
+    confVertex2.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
+
+    JobConf confVertex3 = new JobConf();
+    confVertex3.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        ByteWritable.class.getName());
+    confVertex3.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        ByteWritable.class.getName());
+    confVertex3.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+
+    JobConf confVertex4 = new JobConf();
+    confVertex4.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class.getName());
+    confVertex4.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, Text.class.getName());
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2,
+        confVertex1);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex3,
+        confVertex2);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex4,
+        confVertex3);
+
+    // Verify input params for first vertex.
+    assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex1));
+    assertNull(ConfigUtils.getIntermediateInputKeyClass(confVertex1));
+    assertNull(ConfigUtils.getIntermediateInputValueClass(confVertex1));
+
+    // Verify edge between v1 and v2
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputKeyClass(confVertex1).getName());
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputValueClass(confVertex1).getName());
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateInputKeyClass(confVertex2).getName());
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateInputValueClass(confVertex2).getName());
+    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
+    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex2));
+
+    // Verify edge between v2 and v3
+    assertEquals(LongWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputKeyClass(confVertex2).getName());
+    assertEquals(LongWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputValueClass(confVertex2).getName());
+    assertEquals(LongWritable.class.getName(), ConfigUtils
+        .getIntermediateInputKeyClass(confVertex3).getName());
+    assertEquals(LongWritable.class.getName(), ConfigUtils
+        .getIntermediateInputValueClass(confVertex3).getName());
+    assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex2));
+    assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex3));
+
+    // Verify edge between v3 and v4
+    assertEquals(ByteWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputKeyClass(confVertex3).getName());
+    assertEquals(ByteWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputValueClass(confVertex3).getName());
+    assertEquals(ByteWritable.class.getName(), ConfigUtils
+        .getIntermediateInputKeyClass(confVertex4).getName());
+    assertEquals(ByteWritable.class.getName(), ConfigUtils
+        .getIntermediateInputValueClass(confVertex4).getName());
+    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex3));
+    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex4));
+
+    // Verify output params for first vertex.
+    assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex4));
+    
+    // Verify Edge configuration
+    Configuration edge1OutputConf = MultiStageMRConfToTezTranslator
+        .getOutputConfOnEdge(confVertex1, confVertex2);
+    Configuration edge1InputConf = MultiStageMRConfToTezTranslator
+        .getInputConfOnEdge(confVertex1, confVertex2);
+    
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputKeyClass(edge1OutputConf).getName());
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateOutputValueClass(edge1OutputConf).getName());
+    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(edge1OutputConf));
+    
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateInputKeyClass(edge1InputConf).getName());
+    assertEquals(IntWritable.class.getName(), ConfigUtils
+        .getIntermediateInputValueClass(edge1InputConf).getName());
+    assertTrue(ConfigUtils.isIntermediateInputCompressed(edge1InputConf));
+    
+  }
+}


Mime
View raw message