tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-1272. Change YARNRunner to make use of EdgeConfigurations. (sseth)
Date Tue, 15 Jul 2014 00:08:37 GMT
TEZ-1272. Change YARNRunner to make use of EdgeConfigurations. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/97805278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/97805278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/97805278

Branch: refs/heads/master
Commit: 97805278ed9b7e9f8a20521574b22515d1c553e7
Parents: 650151a
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jul 14 17:08:17 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Jul 14 17:08:17 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   6 +-
 .../org/apache/tez/common/TezJobConfig.java     |  66 +---
 .../tez/mapreduce/examples/MRRSleepJob.java     |  30 +-
 .../mapreduce/examples/OrderedWordCount.java    |  19 +-
 .../apache/tez/mapreduce/client/YARNRunner.java |  40 +--
 .../tez/mapreduce/hadoop/DeprecatedKeys.java    |  78 +----
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  80 ++++-
 .../hadoop/MultiStageMRConfToTezTranslator.java | 307 +------------------
 .../hadoop/MultiStageMRConfigUtil.java          | 150 +--------
 .../org/apache/tez/mapreduce/input/MRInput.java |   3 +-
 .../apache/tez/mapreduce/output/MROutput.java   |   4 +-
 .../hadoop/TestConfigTranslationMRToTez.java    | 110 +------
 .../mapreduce/hadoop/TestDeprecatedKeys.java    |  16 +-
 .../processor/map/TestMapProcessor.java         |  20 +-
 .../processor/reduce/TestReduceProcessor.java   |  40 +--
 .../tez/runtime/library/common/ConfigUtils.java |  22 +-
 .../conf/OnFileSortedOutputConfiguration.java   |  10 +-
 .../OnFileUnorderedKVOutputConfiguration.java   |   8 +-
 ...orderedPartitionedKVOutputConfiguration.java |   8 +-
 .../OrderedPartitionedKVEdgeConfiguration.java  |   3 +-
 .../conf/ShuffledMergedInputConfiguration.java  |  42 ++-
 .../ShuffledUnorderedKVInputConfiguration.java  |   8 +-
 .../library/input/ShuffledMergedInput.java      |  12 +-
 .../library/input/ShuffledUnorderedKVInput.java |   8 +-
 .../library/output/OnFileSortedOutput.java      |  10 +-
 .../library/output/OnFileUnorderedKVOutput.java |   8 +-
 .../OnFileUnorderedPartitionedKVOutput.java     |   8 +-
 .../TestUnorderedPartitionedKVWriter.java       |   8 +-
 .../TestOnFileSortedOutputConfiguration.java    |  29 +-
 ...estOnFileUnorderedKVOutputConfiguration.java |  19 +-
 .../TestOnFileUnorderedPartitionedKVOutput.java |  23 +-
 ...stOrderedPartitionedKVEdgeConfiguration.java |  38 ++-
 .../TestShuffledMergedInputConfiguration.java   |  23 +-
 ...stShuffledUnorderedKVInputConfiguration.java |  19 +-
 ...UnorderedPartitionedKVEdgeConfiguration.java |  12 +-
 ...orderedUnpartitionedKVEdgeConfiguration.java |  12 +-
 .../output/TestOnFileUnorderedKVOutput.java     |   5 +-
 .../src/test/resources/tez-site.xml             |   6 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  13 +-
 39 files changed, 371 insertions(+), 952 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea0447f..d32ff1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,8 +4,6 @@ Apache Tez Change Log
 Release 0.5.0-incubating: Unreleased
 
 INCOMPATIBLE CHANGES
-  TEZ-1213. Fix parameter naming in TezJobConfig.
-    - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14039381
   TEZ-960. VertexManagerPluginContext::getTotalAVailableResource() changed to
   VertexManagerPluginContext::getTotalAvailableResource()
   TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts 
@@ -17,6 +15,10 @@ INCOMPATIBLE CHANGES
   config
   TEZ-692. Unify job submission in either TezClient or TezSession
   TEZ-1130. Replace confusing names on Vertex API
+  TEZ-1213. Fix parameter naming in TezJobConfig.
+    - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14039381
+  TEZ-1272. Change YARNRunner to use EdgeConfigs. 
+    - Removes separation of runtime configs into input/ouput configs. Also refactors public methods used for this conversion.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index dc8aca7..b36e789 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -257,51 +257,20 @@ public class TezJobConfig {
   public static final String TEZ_RUNTIME_INTERNAL_SORTER_CLASS = TEZ_RUNTIME_PREFIX +
       "internal.sorter.class";
 
-  public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS =
-      TEZ_RUNTIME_PREFIX +
-          "intermediate-output.key.comparator.class";
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS =
-      TEZ_RUNTIME_PREFIX +
-          "intermediate-input.key.comparator.class";
-
-  public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS = TEZ_RUNTIME_PREFIX +
-      "intermediate-output.key.class";
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS = TEZ_RUNTIME_PREFIX +
-      "intermediate-input.key.class";
+  public static final String TEZ_RUNTIME_KEY_COMPARATOR_CLASS =
+      TEZ_RUNTIME_PREFIX + "key.comparator.class";
 
-  public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS = TEZ_RUNTIME_PREFIX +
-      "intermediate-output.value.class";
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS = TEZ_RUNTIME_PREFIX +
-      "intermediate-input.value.class";
+  public static final String TEZ_RUNTIME_KEY_CLASS = TEZ_RUNTIME_PREFIX + "key.class";
 
+  public static final String TEZ_RUNTIME_VALUE_CLASS = TEZ_RUNTIME_PREFIX + "value.class";
 
-  /**
-   * Whether intermediate output should be compressed or not
-   */
-  public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS = TEZ_RUNTIME_PREFIX +
-      "intermediate-output.should-compress";
-  /**
-   * Whether intermediate input is compressed
-   */
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED = TEZ_RUNTIME_PREFIX +
-      "intermediate-input.is-compressed";
-  /**
-   * The coded to be used if compressing intermediate output. Only applicable if
-   * tez.runtime.intermediate-output.should-compress is enabled.
-   */
-  public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX +
-      "intermediate-output.compress.codec";
-  /**
-   * The coded to be used when reading intermediate compressed input. Only
-   * applicable if tez.runtime.intermediate-input.is-compressed is enabled.
-   */
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX +
-      "intermediate-input.compress.codec";
+  public static final String TEZ_RUNTIME_COMPRESS = TEZ_RUNTIME_PREFIX + "compress";
 
+  public static final String TEZ_RUNTIME_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX + "compress.codec";
 
-  public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS =
-      TEZ_RUNTIME_PREFIX +
-          "intermediate-input.key.secondary.comparator.class";
+  // TODO Move this key to MapReduce
+  public static final String TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS =
+      TEZ_RUNTIME_PREFIX + "key.secondary.comparator.class";
 
   public static final String TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED =
       TEZ_RUNTIME_PREFIX +
@@ -430,17 +399,12 @@ public class TezJobConfig {
     tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
-    tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_KEY_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_VALUE_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS_CODEC);
+    tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index fa76bca..01c6d51 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -75,7 +75,6 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
@@ -440,8 +439,7 @@ public class MRRSleepJob extends Configured implements Tool {
           NullOutputFormat.class.getName());
     }
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
-        null);
+    MRHelpers.translateVertexConfToTez(mapStageConf);
 
     Configuration[] intermediateReduceStageConfs = null;
     if (iReduceStagesCount > 0
@@ -461,14 +459,8 @@ public class MRRSleepJob extends Configured implements Tool {
         iReduceStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
             MRRSleepJobPartitioner.class.getName());
 
-        if (i == 1) {
-          MultiStageMRConfToTezTranslator.translateVertexConfToTez(
-              iReduceStageConf, mapStageConf);
-        }
-        else {
-          MultiStageMRConfToTezTranslator.translateVertexConfToTez(
-              iReduceStageConf, intermediateReduceStageConfs[i-2]);
-        }
+
+        MRHelpers.translateVertexConfToTez(iReduceStageConf);
         intermediateReduceStageConfs[i-1] = iReduceStageConf;
       }
     }
@@ -487,14 +479,7 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
           NullOutputFormat.class.getName());
 
-      if (iReduceStagesCount > 0
-          && numIReducer > 0) {
-        MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
-            intermediateReduceStageConfs[iReduceStagesCount-1]);
-      } else {
-        MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
-            mapStageConf);
-      }
+      MRHelpers.translateVertexConfToTez(finalReduceConf);
     }
 
     MRHelpers.doJobClientMagic(mapStageConf);
@@ -634,13 +619,14 @@ public class MRRSleepJob extends Configured implements Tool {
     partitionerConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName());
     OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
         .newBuilder(IntWritable.class.getName(), IntWritable.class.getName()).configureOutput(
-            MRPartitioner.class.getName(), partitionerConf).done().build();
+            MRPartitioner.class.getName(), partitionerConf).done().configureInput().useLegacyInput()
+        .done().build();
 
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
-        dag.addEdge(new Edge(vertices.get(i-1),
-            vertices.get(i), edgeConf.createDefaultEdgeProperty()));
+        dag.addEdge(
+            new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 6e4f22c..9ed6294 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -71,7 +71,6 @@ import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
@@ -168,8 +167,7 @@ public class OrderedWordCount extends Configured implements Tool {
       mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
     }
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
-        null);
+    MRHelpers.translateVertexConfToTez(mapStageConf);
 
     Configuration iReduceStageConf = new JobConf(conf);
     iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); // TODO NEWTEZ - NOT NEEDED NOW???
@@ -181,8 +179,7 @@ public class OrderedWordCount extends Configured implements Tool {
         Text.class.getName());
     iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(iReduceStageConf,
-        mapStageConf);
+    MRHelpers.translateVertexConfToTez(iReduceStageConf);
 
     Configuration finalReduceConf = new JobConf(conf);
     finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
@@ -193,8 +190,7 @@ public class OrderedWordCount extends Configured implements Tool {
     finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
     finalReduceConf.setBoolean("mapred.mapper.new-api", true);
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
-        iReduceStageConf);
+    MRHelpers.translateVertexConfToTez(finalReduceConf);
 
     MRHelpers.doJobClientMagic(mapStageConf);
     MRHelpers.doJobClientMagic(iReduceStageConf);
@@ -245,14 +241,15 @@ public class OrderedWordCount extends Configured implements Tool {
 
     OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
         .newBuilder(IntWritable.class.getName(), Text.class.getName()).configureOutput(
-            HashPartitioner.class.getName(), null).done().build();
+            HashPartitioner.class.getName(),
+            null).done().configureInput().useLegacyInput().done().build();
 
     DAG dag = new DAG("OrderedWordCount" + dagIndex);
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
-        dag.addEdge(new Edge(vertices.get(i - 1),
-            vertices.get(i), edgeConf.createDefaultEdgeProperty()));
+        dag.addEdge(
+            new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
       }
     }
     return dag;
@@ -272,7 +269,7 @@ public class OrderedWordCount extends Configured implements Tool {
     Configuration conf = getConf();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
-    boolean generateSplitsInClient = false;
+    boolean generateSplitsInClient;
 
     SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 90f3055..536049a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -78,14 +78,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -100,10 +95,10 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -475,14 +470,17 @@ public class YARNRunner implements ClientProtocol {
     for (int i = 0; i < vertices.length; i++) {
       dag.addVertex(vertices[i]);
       if (i > 0) {
-        EdgeProperty edgeProperty = new EdgeProperty(
-            DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(OnFileSortedOutput.class.getName()),
-            new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
-
-        Edge edge = null;
-        edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
+        // Set edge conf based on Input conf (compression etc properties for MapReduce are
+        // w.r.t Outputs - MAP_OUTPUT_COMPRESS for example)
+        OrderedPartitionedKVEdgeConfiguration edgeConf =
+            OrderedPartitionedKVEdgeConfiguration.newBuilder(stageConfs[i - 1].get(
+                TezJobConfig.TEZ_RUNTIME_KEY_CLASS),
+                stageConfs[i - 1].get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS))
+                .configureOutput(
+                    MRPartitioner.class.getName(), stageConfs[i - 1]).done()
+                .configureInput().useLegacyInput().done()
+                .setFromConfiguration(stageConfs[i - 1]).build();
+        Edge edge = new Edge(vertices[i-1], vertices[i], edgeConf.createDefaultEdgeProperty());
         dag.addEdge(edge);
       }
 
@@ -535,15 +533,11 @@ public class YARNRunner implements ClientProtocol {
     JobConf jobConf = new JobConf(new TezConfiguration(conf));
 
     // Extract individual raw MR configs.
-    Configuration[] stageConfs = MultiStageMRConfToTezTranslator
-        .getStageConfs(jobConf);
+    Configuration[] stageConfs = MultiStageMRConfToTezTranslator.getStageConfs(jobConf);
 
     // Transform all confs to use Tez keys
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
-        null);
-    for (int i = 1; i < stageConfs.length; i++) {
-      MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
-          stageConfs[i - 1]);
+    for (int i = 0; i < stageConfs.length; i++) {
+      MRHelpers.translateVertexConfToTez(stageConfs[i]);
     }
 
     // create inputs to tezClient.submit()

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 90a5142..6104b0e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -27,8 +27,6 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 
-import com.google.common.collect.Maps;
-
 public class DeprecatedKeys {
 
   
@@ -39,17 +37,6 @@ public class DeprecatedKeys {
    */
   private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
 
-  
-  public static enum MultiStageKeys {
-    INPUT, OUTPUT
-  }
-  /**
-   * Keys which are used across an edge. i.e. by an Output-Input pair.
-   */
-  private static Map<String, Map<MultiStageKeys, String>> multiStageParamMap =
-      new HashMap<String, Map<MultiStageKeys, String>>();
-  
-  
   /**
    * Keys used by the Tez Runtime.
    */
@@ -61,51 +48,9 @@ public class DeprecatedKeys {
   static {
     populateMRToTezRuntimeParamMap();
     populateMRToDagParamMap();
-    populateMultiStageParamMap();
     addDeprecatedKeys();
   }
-  
-  
-  private static void populateMultiStageParamMap() {
-    
-    multiStageParamMap.put(
-        MRJobConfig.KEY_COMPARATOR,
-        getDeprecationMap(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
-    
-    multiStageParamMap.put(
-        MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        getDeprecationMap(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS));
-    
-    multiStageParamMap.put(
-        MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        getDeprecationMap(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS));
-
-    multiStageParamMap.put(
-        MRJobConfig.MAP_OUTPUT_COMPRESS,
-        getDeprecationMap(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
-
-    multiStageParamMap.put(
-        MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
-        getDeprecationMap(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC,
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
-  }
-  
-  private static Map<MultiStageKeys, String> getDeprecationMap(String inputKey, String outputKey) {
-    Map<MultiStageKeys, String>  m = Maps.newEnumMap(MultiStageKeys.class);
-    m.put(MultiStageKeys.INPUT, inputKey);
-    m.put(MultiStageKeys.OUTPUT, outputKey);
-    return m;
-  }
-  
+
   private static void populateMRToDagParamMap() {
     // TODO Default value handling.
     mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
@@ -192,9 +137,20 @@ public class DeprecatedKeys {
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
     
-    registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
+    registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
     
-    registerMRToRuntimeKeyTranslation(TezJobConfig.TEZ_CREDENTIALS_PATH, MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.TEZ_CREDENTIALS_PATH);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.KEY_COMPARATOR, TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TezJobConfig.TEZ_RUNTIME_KEY_CLASS);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TezJobConfig.TEZ_RUNTIME_VALUE_CLASS);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS, TezJobConfig.TEZ_RUNTIME_COMPRESS);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC);
+
   }
   
   private static void addDeprecatedKeys() {
@@ -217,10 +173,4 @@ public class DeprecatedKeys {
   public static Map<String, String> getMRToTezRuntimeParamMap() {
     return Collections.unmodifiableMap(mrParamToTezRuntimeParamMap);
   }
-
-  // TODO Ideally, multi-stage should not be exposed.
-  public static Map<String, Map<MultiStageKeys, String>> getMultiStageParamMap() {
-    return Collections.unmodifiableMap(multiStageParamMap);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 7f30e7f..f0eda1f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -30,8 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -99,6 +97,84 @@ public class MRHelpers {
       "job.splitmetainfo";
 
   /**
+   * Translates MR keys to Tez for the provided vertex conf. The conversion is
+   * done in place.
+   *
+   * @param conf
+   *          Configuration for the vertex being configured.
+   */
+  @LimitedPrivate("Hive, Pig")
+  @Unstable
+  public static void translateVertexConfToTez(Configuration conf) {
+    convertVertexConfToTez(conf);
+  }
+
+  private static void convertVertexConfToTez(Configuration vertexConf) {
+    setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
+    processDirectConversion(vertexConf);
+  }
+
+  /**
+   * Pulls in specific keys from the base configuration, if they are not set at
+   * the stage level. An explicit list of keys is copied over (not all), which
+   * require translation to tez keys.
+   */
+  private static void setStageKeysFromBaseConf(Configuration conf,
+                                               Configuration baseConf, String stage) {
+    JobConf jobConf = new JobConf(baseConf);
+    // Don't clobber explicit tez config.
+    if (conf.get(TezJobConfig.TEZ_RUNTIME_KEY_CLASS) == null) {
+      // If this is set, but the comparator is not set, and their types differ -
+      // the job will break.
+      if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
+        // Pull this in from the baseConf
+        conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf
+            .getMapOutputKeyClass().getName());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
+              + " for stage: " + stage
+              + " based on job level configuration. Value: "
+              + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
+        }
+      }
+    }
+
+    if (conf.get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS) == null) {
+      if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
+        conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
+            .getMapOutputValueClass().getName());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
+              + " for stage: " + stage
+              + " based on job level configuration. Value: "
+              + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
+        }
+      }
+    }
+  }
+
+  private static void processDirectConversion(Configuration conf) {
+    for (Map.Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+      if (conf.get(dep.getKey()) != null) {
+        // TODO Deprecation reason does not seem to reflect in the config ?
+        // The ordering is important in case of keys which are also deprecated.
+        // Unset will unset the deprecated keys and all it's variants.
+        final String mrValue = conf.get(dep.getKey());
+        final String tezValue = conf.get(dep.getValue());
+        conf.unset(dep.getKey());
+        if (tezValue == null) {
+          conf.set(dep.getValue(), mrValue, "TRANSLATED_TO_TEZ");
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value="
+              + mrValue + ", tez:" + dep.getValue() + "=" + conf.get(dep.getValue()));
+        }
+      }
+    }
+  }
+
+  /**
    * Comparator for org.apache.hadoop.mapreduce.InputSplit
    */
   private static class InputSplitComparator

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 3e1980c..a9f3c3f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -18,193 +18,15 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.mapreduce.combine.MRCombiner;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 
-import com.google.common.base.Preconditions;
 
 public class MultiStageMRConfToTezTranslator {
 
-  private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class);
-
-  private enum DeprecationReason {
-    DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE
-  }
-
-  // FIXME Add unit tests.
-  // This will convert configs to tez.<vertexName>.<OriginalProperty> for
-  // properties which it understands. Doing this for the initial and final task
-  // as well to verify functionality.
-  //
-
-  /**
-   * Converts a single completely configured MR* job to something that can be
-   * understood by the Tez MR runtime.
-   * 
-   * @param srcConf
-   *          the configuration for the entire MR* job. Configs for the first
-   *          and last stage are expected to be set at root level. Configs for
-   *          intermediate stages will be prefixed with the stage number.
-   * @return A translated MR* config with keys translated over to Tez.
-   */
-  // TODO Set the cause properly.
-  public static Configuration convertMRToLinearTez(Configuration srcConf) {
-    Configuration newConf = new Configuration(srcConf);
-
-    int numIntermediateStages = MultiStageMRConfigUtil
-        .getNumIntermediateStages(srcConf);
-    boolean hasFinalReduceStage = (srcConf.getInt(MRJobConfig.NUM_REDUCES, 0) > 0);
-
-    // Assuming no 0 map jobs, and the first stage is always a map.
-    int totalStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
-    int numEdges = totalStages - 1;
-
-    Configuration[] allConfs = extractStageConfs(newConf, numEdges);
-
-    for (int i = 0; i < allConfs.length; i++) {
-      setStageKeysFromBaseConf(allConfs[i], srcConf, Integer.toString(i));
-      processDirectConversion(allConfs[i]);
-    }
-    for (int i = 0; i < allConfs.length - 1; i++) {
-      translateMultiStageWithSuccessor(allConfs[i], allConfs[i + 1]);
-
-    }
-    // Unset unnecessary keys in the last stage. Will end up being called for
-    // single stage as well which should be harmless.
-    translateMultiStageWithSuccessor(allConfs[allConfs.length - 1], null);
-
-    for (int i = 0; i < allConfs.length; i++) {
-      String vertexName;
-      if (i == 0) {
-        vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-      } else if (i == allConfs.length - 1) {
-        vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
-      } else {
-        // Intermediate vertices start at 1
-        vertexName = MultiStageMRConfigUtil.getIntermediateStageVertexName(i);
-      }
-      MultiStageMRConfigUtil.addConfigurationForVertex(newConf, vertexName,
-          allConfs[i]);
-    }
-
-    return newConf;
-  }
-
-  
-
-  /**
-   * Translates MR keys to Tez for the provided vertex conf. The conversion is
-   * done in place.
-   * 
-   * This method should be called for each stage config of the MR* job. The call
-   * for the first vertex should set the predecessorConf as null.
-   * 
-   * Since there's no separation of input / output params at the moment, the
-   * config generated by this can be set as the configuration for Tez
-   * Input/Output and Processor.
-   * 
-   * @param conf
-   *          Configuration for the vertex being configured.
-   * @param predecessorConf
-   *          Configuration for the previous vertex in the MR* chain
-   */
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static void translateVertexConfToTez(Configuration conf,
-      Configuration predecessorConf) {
-    convertVertexConfToTez(conf, predecessorConf);
-  }
-
-  /**
-   * Given a source and destination vertex, returns the config which should be
-   * used for the Output on this edge. The configs must be configured with tez
-   * keys - or run through translateVertexConfToTez.
-   * 
-   * @param srcVertex
-   *          The tez configuration for the source vertex.
-   * @param destVertex
-   *          The tez configuration for the destination vertex.
-   * @return the output Configuration object for the edge
-   */
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static Configuration getOutputConfOnEdge(Configuration srcVertex,
-      Configuration destVertex) {
-    Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge");
-    Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge");
-    return srcVertex;
-  }
-
-  /**
-   * Given a source and destination vertex, returns the config which should be
-   * used for the Input on this edge. The configs must be configured with tez
-   * keys - or run through translateVertexConfToTez.
-   * 
-   * @param srcVertex
-   *          The tez configuration for the source vertex.
-   * @param destVertex
-   *          The tez configuration for the destination vertex.
-   * @return the input Configuration object for the edge
-   */
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static Configuration getInputConfOnEdge(Configuration srcVertex,
-      Configuration destVertex) {
-    Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge");
-    Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge");
-    return destVertex;
-  }
-
-  private static void convertVertexConfToTez(Configuration vertexConf,
-      Configuration predecessorConf) {
-    setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
-    processDirectConversion(vertexConf);
-    translateMultiStageWithPredecessor(vertexConf, predecessorConf);
-  }
-
-  /**
-   * Constructs a list containing individual configuration for each stage of the
-   * linear MR job, including the first map and last reduce if applicable.
-   * 
-   * Generates basic configurations - i.e. without inheriting any keys from the
-   * top level conf. // TODO Validate this comment.
-   */
-  private static Configuration[] extractStageConfs(Configuration conf,
-      int totalEdges) {
-    int numStages = totalEdges + 1;
-    Configuration confs[] = new Configuration[numStages];
-    // TODO Make moer efficient instead of multiple scans.
-    Configuration nonIntermediateConf = MultiStageMRConfigUtil
-        .getAndRemoveBasicNonIntermediateStageConf(conf);
-    if (numStages == 1) {
-      confs[0] = nonIntermediateConf;
-    } else {
-      confs[0] = nonIntermediateConf;
-      confs[numStages - 1] = new Configuration(nonIntermediateConf);
-    }
-    if (numStages > 2) {
-      for (int i = 1; i < numStages - 1; i++) {
-        confs[i] = MultiStageMRConfigUtil
-            .getAndRemoveBasicIntermediateStageConf(conf, i);
-      }
-    }
-    return confs;
-  }
-  
-  
-  
   /**
    * Given a single base MRR config, returns a list of complete stage
    * configurations.
@@ -260,131 +82,4 @@ public class MultiStageMRConfToTezTranslator {
     }
     return confs;
   }
-
-  private static void processDirectConversion(Configuration conf) {
-    for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap()
-        .entrySet()) {
-      if (conf.get(dep.getKey()) != null) {
-        // TODO Deprecation reason does not seem to reflect in the config ?
-        // The ordering is important in case of keys which are also deprecated.
-        // Unset will unset the deprecated keys and all it's variants.
-        final String mrValue = conf.get(dep.getKey());
-        final String tezValue = conf.get(dep.getValue());
-        conf.unset(dep.getKey());
-        if (tezValue == null) {
-          conf.set(dep.getValue(), mrValue,
-              DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name());  
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value="
-            + mrValue + ", tez:" + dep.getValue() + "=" + conf.get(dep.getValue()));
-        }
-      }
-    }
-  }
-
-  /**
-   * Takes as parameters configurations for the vertex and it's predecessor
-   * (already translated to Tez). Modifies the vertex conf in place.
-   */
-  private static void translateMultiStageWithPredecessor(
-      Configuration vertexConf, Configuration predecessorConf) {
-    Preconditions.checkNotNull(vertexConf,
-        "Configuration for vertex being translated cannot be null");
-    for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
-        .getMultiStageParamMap().entrySet()) {
-      if (vertexConf.get(dep.getKey()) != null) {
-        String value = vertexConf.get(dep.getKey());
-        vertexConf.unset(dep.getKey());
-        if (vertexConf.get(dep.getValue().get(MultiStageKeys.OUTPUT)) == null) {
-          vertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
-            DeprecationReason.DEPRECATED_MULTI_STAGE.name());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value="
-              + value + ", tez:" + dep.getValue().get(MultiStageKeys.OUTPUT) + "="
-              + vertexConf.get(dep.getValue().get(MultiStageKeys.OUTPUT)));
-          }
-        }
-      }
-      // Set keys from the predecessor conf.
-      if (predecessorConf != null) {
-        String expPredecessorKey = dep.getValue().get(MultiStageKeys.OUTPUT);
-        if (predecessorConf.get(expPredecessorKey) != null) {
-          String value = predecessorConf.get(expPredecessorKey);
-          vertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value);
-        }
-      }
-    }
-  }
-
-  /**
-   * Takes as parameters configurations for the vertex and it's successor.
-   * Modifies both in place.
-   */
-  private static void translateMultiStageWithSuccessor(Configuration srcVertexConf,
-      Configuration destVertexConf) {
-    // All MR keys which need such translation are specified at src - hence,
-    // this is ok.
-    // No key exists in which the map is inferring something based on the reduce
-    // value.
-    for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
-        .getMultiStageParamMap().entrySet()) {
-      if (srcVertexConf.get(dep.getKey()) != null) {
-        if (destVertexConf != null) {
-          String value = srcVertexConf.get(dep.getKey());
-          srcVertexConf.unset(dep.getKey());
-          srcVertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
-              DeprecationReason.DEPRECATED_MULTI_STAGE.name());
-          destVertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value,
-              DeprecationReason.DEPRECATED_MULTI_STAGE.name());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value="
-              + value + ", tez:" + dep.getValue() + "="
-              + destVertexConf.get(dep.getValue().get(MultiStageKeys.INPUT)));
-          }
-        } else { // Last stage. Just remove the key reference.
-          srcVertexConf.unset(dep.getKey());
-        }
-      }
-    }
-  }
-
-  /**
-   * Pulls in specific keys from the base configuration, if they are not set at
-   * the stage level. An explicit list of keys is copied over (not all), which
-   * require translation to tez keys.
-   */
-  private static void setStageKeysFromBaseConf(Configuration conf,
-      Configuration baseConf, String stage) {
-    JobConf jobConf = new JobConf(baseConf);
-    // Don't clobber explicit tez config.
-    if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) {
-      // If this is set, but the comparator is not set, and their types differ -
-      // the job will break.
-      if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
-        // Pull this in from the baseConf
-        conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf
-            .getMapOutputKeyClass().getName());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
-              + " for stage: " + stage
-              + " based on job level configuration. Value: "
-              + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
-        }
-      }
-    }
-
-    if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) {
-      if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
-        conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
-            .getMapOutputValueClass().getName());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
-              + " for stage: " + stage
-              + " based on job level configuration. Value: "
-              + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
-        }
-      }
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
index 0a7a940..13e0b86 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
@@ -24,62 +24,21 @@ import java.util.Map.Entry;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 
+@Private
 public class MultiStageMRConfigUtil {
 
   //////////////////////////////////////////////////////////////////////////////
   //                    Methods based on Stage Num                            //
   //////////////////////////////////////////////////////////////////////////////
 
-  // Returns config settings specific to stage
-  public static Configuration getBasicIntermediateStageConf(
-      Configuration baseConf, int i) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForIntermediateStage(i, ""), false, true);
-  }
-
-  // Returns and removes config settings specific to stage
-  public static Configuration getAndRemoveBasicIntermediateStageConf(
-      Configuration baseConf, int i) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForIntermediateStage(i, ""), true, true);
-  }
-
-  // TODO Get rid of this once YARNRunner starts using VertexNames.
-  public static Configuration getIntermediateStageConf(Configuration baseConf,
-      int i) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForIntermediateStage(i, ""), false, false);
-  }
-
-  // FIXME small perf hit. Change this to parse through all keys once and
-  // generate objects per
-  // stage instead of scanning through conf multiple times.
-  public static Configuration getAndRemoveBasicNonIntermediateStageConf(
-      Configuration baseConf) {
-    Configuration newConf = new Configuration(false);
-    for (String key : DeprecatedKeys.getMRToTezRuntimeParamMap().keySet()) {
-      if (baseConf.get(key) != null) {
-        newConf.set(key, baseConf.get(key));
-        baseConf.unset(key);
-      }
-    }
-
-    for (String key : DeprecatedKeys.getMultiStageParamMap().keySet()) {
-      if (baseConf.get(key) != null) {
-        newConf.set(key, baseConf.get(key));
-        baseConf.unset(key);
-      }
-    }
-    return newConf;
-  }
-
-  // TODO MRR FIXME based on conf format.
+  @Private
   public static int getNumIntermediateStages(Configuration conf) {
     return conf.getInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
   }
 
   // TODO MRR FIXME based on conf format.
   // Intermediate stage numbers should start from 1.
+  @Private
   public static String getPropertyNameForIntermediateStage(
       int intermediateStage, String originalPropertyName) {
     return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
@@ -94,84 +53,20 @@ public class MultiStageMRConfigUtil {
   private static final String FINAL_REDUCE_VERTEX_NAME = "finalreduce";
   private static final String INTERMEDIATE_TASK_VERTEX_NAME_PREFIX = "ivertex";
 
+  @Private
   public static String getInitialMapVertexName() {
     return INITIAL_MAP_VERTEX_NAME;
   }
-  
-  public boolean isInitialMapVertex(String vertexName) {
-    return vertexName.equals(INITIAL_MAP_VERTEX_NAME);
-  }
 
+  @Private
   public static String getFinalReduceVertexName() {
     return FINAL_REDUCE_VERTEX_NAME;
   }
 
-  public boolean isFinalReduceVertex(String vertexName) {
-    return vertexName.equals(FINAL_REDUCE_VERTEX_NAME);
-  }
-
+  @Private
   public static String getIntermediateStageVertexName(int stageNum) {
     return INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + stageNum;
   }
-  
-  public static int getIntermediateStageNum(String vertexName) {
-    if (vertexName.matches(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + "\\d+")) {
-      return Integer.parseInt(vertexName
-          .substring(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX.length()));
-    } else {
-      return -1;
-    }
-  }
-
-  // Returns config settings specific to named vertex
-  public static Configuration getBasicConfForVertex(Configuration baseConf,
-      String vertexName) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForVertex(vertexName, ""), false, true);
-  }
-
-  // Returns and removes config settings specific to named vertex
-  public static Configuration getAndRemoveBasicConfForVertex(
-      Configuration baseConf, String vertexName) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForVertex(vertexName, ""), true, true);
-  }
-
-  // Returns a config with all parameters, and vertex specific params moved to
-  // the top level.
-  public static Configuration getConfForVertex(Configuration baseConf,
-      String vertexName) {
-    return getBasicIntermediateStageConfInternal(baseConf,
-        getPropertyNameForVertex(vertexName, ""), false, false);
-  }
-
-  public static void addConfigurationForVertex(Configuration baseConf,
-      String vertexName, Configuration vertexConf) {
-    Iterator<Entry<String, String>> confEntries = vertexConf.iterator();
-    while (confEntries.hasNext()) {
-      Entry<String, String> entry = confEntries.next();
-      baseConf.set(getPropertyNameForVertex(vertexName, entry.getKey()),
-          entry.getValue());
-    }
-  }
-
-  // TODO This is TezEngineLand
-  public static String getPropertyNameForVertex(String vertexName,
-      String originalPropertyName) {
-    return MRJobConfig.MRR_VERTEX_PREFIX + vertexName + "."
-        + originalPropertyName;
-  }
-
-  // TODO Get rid of this. Temporary for testing.
-  public static void printConf(Configuration conf) {
-    Iterator<Entry<String, String>> confEntries = conf.iterator();
-    while (confEntries.hasNext()) {
-      Entry<String, String> entry = confEntries.next();
-      String key = entry.getKey();
-      String value = entry.getValue();
-      System.err.println("Key: " + key + ", Value: " + value);
-    }
-  }
 
   @Private
   static Configuration extractStageConf(Configuration baseConf,
@@ -209,37 +104,4 @@ public class MultiStageMRConfigUtil {
     }
     return conf;
   }
-  
-  // TODO MRR FIXME based on conf format.
-  private static Configuration getBasicIntermediateStageConfInternal(
-      Configuration baseConf, String prefix, boolean remove, boolean stageOnly) {
-    Configuration strippedConf = new Configuration(false);
-    Configuration conf = new Configuration(false);
-    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
-    while (confEntries.hasNext()) {
-      Entry<String, String> entry = confEntries.next();
-      String key = entry.getKey();
-      if (key.startsWith(prefix)) {
-        if (remove) {
-          baseConf.unset(key);
-        }
-        String newKey = key.replace(prefix, "");
-        strippedConf.set(newKey, entry.getValue());
-      } else if (!stageOnly) {
-        conf.set(key, entry.getValue());
-      }
-    }
-    // Replace values from strippedConf into the finalConf. Override values
-    // which may have been copied over from the baseConf root level.
-    if (stageOnly) {
-      conf = strippedConf;
-    } else {
-      Iterator<Entry<String, String>> entries = strippedConf.iterator();
-      while (entries.hasNext()) {
-        Entry<String, String> entry = entries.next();
-        conf.set(entry.getKey(), entry.getValue());
-      }
-    }
-    return conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index e8fbf55..4e3e5e7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -35,7 +35,6 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.base.MRInputBase;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
 import org.apache.tez.mapreduce.lib.MRReader;
@@ -115,7 +114,7 @@ public class MRInput extends MRInputBase {
     inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
         configInputFormatClassName);
     inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(inputConf, null);
+    MRHelpers.translateVertexConfToTez(inputConf);
     MRHelpers.doJobClientMagic(inputConf);
     if (groupSplitsInAM) {
       return MRHelpers.createMRInputPayloadWithGrouping(inputConf,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 987ca09..3cca35c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -43,7 +43,6 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -94,8 +93,7 @@ public class MROutput extends AbstractLogicalOutput {
     Configuration outputConf = new JobConf(conf);
     outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
     outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(outputConf,
-        null);
+    MRHelpers.translateVertexConfToTez(outputConf);
     MRHelpers.doJobClientMagic(outputConf);
     return TezUtils.createUserPayloadFromConf(outputConf);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
index 5e3d201..3078e02 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -19,15 +19,10 @@
 package org.apache.tez.mapreduce.hadoop;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.junit.Test;
@@ -40,122 +35,41 @@ public class TestConfigTranslationMRToTez {
   public void testComplexKeys() {
 
     JobConf confVertex1 = new JobConf();
-    JobConf confVertex2 = new JobConf();
     
     confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, IntWritable.class.getName());
-    confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, ByteWritable.class.getName());
     
     confVertex1.unset(MRJobConfig.KEY_COMPARATOR);
     confVertex1.unset(MRJobConfig.GROUP_COMPARATOR_CLASS);
-    confVertex2.unset(MRJobConfig.KEY_COMPARATOR);
-    confVertex2.unset(MRJobConfig.GROUP_COMPARATOR_CLASS);
-    
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2,
-        confVertex1);
     
+    MRHelpers.translateVertexConfToTez(confVertex1);
+
     assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils
         .getIntermediateOutputKeyComparator(confVertex1).getClass().getName());
     assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils
-        .getIntermediateInputKeyComparator(confVertex2).getClass().getName());
+        .getIntermediateInputKeyComparator(confVertex1).getClass().getName());
   }
 
   @Test
-  public void testMultiStageConversion() {
+  public void testMRToTezKeyTranslation() {
     JobConf confVertex1 = new JobConf();
     confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
         IntWritable.class.getName());
     confVertex1.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    confVertex1.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
-
-    JobConf confVertex2 = new JobConf();
-    confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        LongWritable.class.getName());
-    confVertex2.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
         LongWritable.class.getName());
-    confVertex2.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
-
-    JobConf confVertex3 = new JobConf();
-    confVertex3.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        ByteWritable.class.getName());
-    confVertex3.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        ByteWritable.class.getName());
-    confVertex3.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
-
-    JobConf confVertex4 = new JobConf();
-    confVertex4.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class.getName());
-    confVertex4.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, Text.class.getName());
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2,
-        confVertex1);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex3,
-        confVertex2);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex4,
-        confVertex3);
+    confVertex1.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
 
-    // Verify input params for first vertex.
-    assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex1));
-    assertNull(ConfigUtils.getIntermediateInputKeyClass(confVertex1));
-    assertNull(ConfigUtils.getIntermediateInputValueClass(confVertex1));
+    MRHelpers.translateVertexConfToTez(confVertex1);
 
-    // Verify edge between v1 and v2
+    // Verify translation
     assertEquals(IntWritable.class.getName(), ConfigUtils
         .getIntermediateOutputKeyClass(confVertex1).getName());
-    assertEquals(IntWritable.class.getName(), ConfigUtils
+    assertEquals(LongWritable.class.getName(), ConfigUtils
         .getIntermediateOutputValueClass(confVertex1).getName());
     assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateInputKeyClass(confVertex2).getName());
-    assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateInputValueClass(confVertex2).getName());
-    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
-    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex2));
-
-    // Verify edge between v2 and v3
+        .getIntermediateInputKeyClass(confVertex1).getName());
     assertEquals(LongWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputKeyClass(confVertex2).getName());
-    assertEquals(LongWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputValueClass(confVertex2).getName());
-    assertEquals(LongWritable.class.getName(), ConfigUtils
-        .getIntermediateInputKeyClass(confVertex3).getName());
-    assertEquals(LongWritable.class.getName(), ConfigUtils
-        .getIntermediateInputValueClass(confVertex3).getName());
-    assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex2));
-    assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex3));
-
-    // Verify edge between v3 and v4
-    assertEquals(ByteWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputKeyClass(confVertex3).getName());
-    assertEquals(ByteWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputValueClass(confVertex3).getName());
-    assertEquals(ByteWritable.class.getName(), ConfigUtils
-        .getIntermediateInputKeyClass(confVertex4).getName());
-    assertEquals(ByteWritable.class.getName(), ConfigUtils
-        .getIntermediateInputValueClass(confVertex4).getName());
-    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex3));
-    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex4));
-
-    // Verify output params for first vertex.
-    assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex4));
-    
-    // Verify Edge configuration
-    Configuration edge1OutputConf = MultiStageMRConfToTezTranslator
-        .getOutputConfOnEdge(confVertex1, confVertex2);
-    Configuration edge1InputConf = MultiStageMRConfToTezTranslator
-        .getInputConfOnEdge(confVertex1, confVertex2);
-    
-    assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputKeyClass(edge1OutputConf).getName());
-    assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateOutputValueClass(edge1OutputConf).getName());
-    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(edge1OutputConf));
-    
-    assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateInputKeyClass(edge1InputConf).getName());
-    assertEquals(IntWritable.class.getName(), ConfigUtils
-        .getIntermediateInputValueClass(edge1InputConf).getName());
-    assertTrue(ConfigUtils.isIntermediateInputCompressed(edge1InputConf));
-    
+        .getIntermediateInputValueClass(confVertex1).getName());
+    assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
+    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index 2dde271..bbfdc24 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -42,7 +42,7 @@ public class TestDeprecatedKeys {
     jobConf.setBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, true);
     jobConf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0.33f);
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null);
+    MRHelpers.translateVertexConfToTez(jobConf);
 
     assertEquals(0.4f, jobConf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f);
@@ -70,7 +70,6 @@ public class TestDeprecatedKeys {
    */
   public void verifyTezOverridenKeys() {
     JobConf jobConf = new JobConf();
-    JobConf jobConf2 = new JobConf();
     jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000);
     jobConf.setInt(MRJobConfig.IO_SORT_MB, 100);
     jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100);
@@ -97,14 +96,12 @@ public class TestDeprecatedKeys {
     jobConf.setFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 10.0f);
     jobConf.set(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "DefaultSorter");
     jobConf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, "groupComparator");
-    jobConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, "SecondaryComparator");
+    jobConf.set(TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, "SecondaryComparator");
     
     jobConf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
-    jobConf2.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
-    jobConf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true);
+    jobConf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true);
 
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf2, jobConf);
+    MRHelpers.translateVertexConfToTez(jobConf);
 
     assertEquals(1000, jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
     assertEquals(200, jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 100));
@@ -127,10 +124,9 @@ public class TestDeprecatedKeys {
     assertEquals(10.0f, jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f), 0.0f);
     assertEquals("DefaultSorter", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
     assertEquals("groupComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, ""));
-    assertEquals("SecondaryComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, ""));
+    assertEquals("SecondaryComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, ""));
     assertEquals("DefaultSorter", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
-    assertTrue(jobConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false));
-    assertTrue(jobConf2.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false));
+    assertTrue(jobConf.getBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, false));
 
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD));
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 1bd24e7..a91ad31 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -39,7 +38,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
@@ -108,30 +106,26 @@ public class TestMapProcessor {
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
 
-    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    MRHelpers.translateVertexConfToTez(jobConf);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
 
-    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-        vertexName);
-    
-    JobConf job = new JobConf(stageConf);
-    job.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
 
-    job.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
     Path mapInput = new Path(workDir, "map0");
     
     
-    MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
     
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRHelpers.createMRInputPayload(job, null)),
+            .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
-    LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
+    LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
         new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
         Collections.singletonList(mapInputSpec),
         Collections.singletonList(mapOutputSpec));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 639cfcd..eff108b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -50,7 +49,6 @@ import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
@@ -121,28 +119,23 @@ public class TestReduceProcessor {
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
     
-    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
-    
-    Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-        mapVertexName);
-    
-    JobConf mapConf = new JobConf(mapStageConf);
-    
-    mapConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+    MRHelpers.translateVertexConfToTez(jobConf);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
+    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
-    mapConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
     
     Path mapInput = new Path(workDir, "map0");
-    MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
     
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRHelpers.createMRInputPayload(mapConf, null)),
+            .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
-    LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
+    LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
         mapInput, new TestUmbilical(), dagName, mapVertexName,
         Collections.singletonList(mapInputSpec),
         Collections.singletonList(mapOutputSpec));
@@ -154,16 +147,13 @@ public class TestReduceProcessor {
     LOG.info("Starting reduce...");
     
     Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>();
-    
-    Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-        reduceVertexName);
-    JobConf reduceConf = new JobConf(reduceStageConf);
-    reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
-    reduceConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
-    FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
+    FileOutputFormat.setOutputPath(jobConf, new Path(workDir, "output"));
     ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
+        ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName,
         new InputDescriptor(LocalMergedInput.class.getName()), 1);
@@ -186,7 +176,7 @@ public class TestReduceProcessor {
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
         taskSpec,
         0,
-        reduceConf,
+        jobConf,
         new String[] {workDir.toString()},
         new TestUmbilical(),
         serviceConsumerMetadata,
@@ -211,7 +201,7 @@ public class TestReduceProcessor {
     Path reduceOutputFile = new Path(reduceOutputDir, "part-v001-o000-00000");
     
     SequenceFile.Reader reader = new SequenceFile.Reader(localFs,
-        reduceOutputFile, reduceConf);
+        reduceOutputFile, jobConf);
 
     LongWritable key = new LongWritable();
     Text value = new Text();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
index 0749b2c..8077d77 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -45,7 +45,7 @@ public class ConfigUtils {
       Configuration conf, Class<DefaultCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
     String name = conf
-        .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+        .get(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC);
     if (name != null) {
       try {
         codecClass = conf.getClassByName(name).asSubclass(
@@ -62,7 +62,7 @@ public class ConfigUtils {
       Configuration conf, Class<DefaultCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
     String name = conf
-        .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+        .get(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC);
     if (name != null) {
       try {
         codecClass = conf.getClassByName(name).asSubclass(
@@ -80,45 +80,45 @@ public class ConfigUtils {
   
   public static boolean shouldCompressIntermediateOutput(Configuration conf) {
     return conf.getBoolean(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
+        TezJobConfig.TEZ_RUNTIME_COMPRESS, false);
   }
 
   public static boolean isIntermediateInputCompressed(Configuration conf) {
     return conf.getBoolean(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false);
+        TezJobConfig.TEZ_RUNTIME_COMPRESS, false);
   }
 
   public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
     Class<V> retv = (Class<V>) conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, null,
         Object.class);
     return retv;
   }
   
   public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
     Class<V> retv = (Class<V>) conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, null,
         Object.class);
     return retv;
   }
 
   public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
     Class<K> retv = (Class<K>) conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_KEY_CLASS, null,
         Object.class);
     return retv;
   }
 
   public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
     Class<K> retv = (Class<K>) conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_KEY_CLASS, null,
         Object.class);
     return retv;
   }
 
   public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
     Class<? extends RawComparator> theClass = conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null,
         RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, conf);
@@ -128,7 +128,7 @@ public class ConfigUtils {
 
   public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
     Class<? extends RawComparator> theClass = conf.getClass(
-        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
+        TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null,
         RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, conf);
@@ -143,7 +143,7 @@ public class ConfigUtils {
       Configuration conf) {
     Class<? extends RawComparator> theClass = conf
         .getClass(
-            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+            TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS,
             null, RawComparator.class);
     if (theClass == null) {
       return getIntermediateInputKeyComparator(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
index 44ebf2f..e6952c7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
@@ -202,14 +202,14 @@ public class OnFileSortedOutputConfiguration {
     @InterfaceAudience.Private
     Builder setKeyClassName(String keyClassName) {
       Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName);
       return this;
     }
 
     @InterfaceAudience.Private
     Builder setValueClassName(String valueClassName) {
       Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
       return this;
     }
 
@@ -292,16 +292,16 @@ public class OnFileSortedOutputConfiguration {
      * @return instance of the current builder
      */
     public Builder setKeyComparatorClass(String comparatorClassName) {
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
           comparatorClassName);
       return this;
     }
 
     public Builder enableCompression(String compressionCodec) {
-      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true);
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true);
       if (compressionCodec != null) {
         this.conf
-            .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec);
+            .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
index e13e075..e5a5de9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
@@ -151,14 +151,14 @@ public class OnFileUnorderedKVOutputConfiguration {
     @InterfaceAudience.Private
     Builder setKeyClassName(String keyClassName) {
       Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName);
       return this;
     }
 
     @InterfaceAudience.Private
     Builder setValueClassName(String valueClassName) {
       Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
       return this;
     }
 
@@ -200,10 +200,10 @@ public class OnFileUnorderedKVOutputConfiguration {
     }
 
     public Builder enableCompression(String compressionCodec) {
-      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true);
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true);
       if (compressionCodec != null) {
         this.conf
-            .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec);
+            .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
index dc8f34d..ce6ae03 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
@@ -171,14 +171,14 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration {
     @InterfaceAudience.Private
     Builder setKeyClassName(String keyClassName) {
       Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName);
       return this;
     }
 
     @InterfaceAudience.Private
     Builder setValueClassName(String valueClassName) {
       Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName);
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
       return this;
     }
 
@@ -239,10 +239,10 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration {
     }
 
     public Builder enableCompression(String compressionCodec) {
-      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true);
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true);
       if (compressionCodec != null) {
         this.conf
-            .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec);
+            .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java
index be1a92e..94d1d27 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
@@ -74,7 +73,7 @@ public class OrderedPartitionedKVEdgeConfiguration extends HadoopKeyValuesBasedB
 
   @Override
   public String getInputClassName() {
-    return ShuffledMergedInput.class.getName();
+    return inputConf.getInputClassName();
   }
 
   /**


Mime
View raw message