tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-1262. Change Tez examples to use Edge configs. (sseth)
Date Thu, 10 Jul 2014 01:21:36 GMT
TEZ-1262. Change Tez examples to use Edge configs. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7296793c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7296793c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7296793c

Branch: refs/heads/master
Commit: 7296793c87fc37f3fa3d588104abd8c3b0648bfd
Parents: cd474e0
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jul 9 18:21:11 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jul 9 18:21:11 2014 -0700

----------------------------------------------------------------------
 .../examples/BroadcastAndOneToOneExample.java   |  57 +---
 .../mapreduce/examples/FilterLinesByWord.java   |  29 +-
 .../examples/FilterLinesByWordOneToOne.java     |  30 +--
 .../mapreduce/examples/IntersectValidate.java   |  41 +--
 .../tez/mapreduce/examples/MRRSleepJob.java     |  31 +--
 .../mapreduce/examples/OrderedWordCount.java    |  32 +--
 .../tez/mapreduce/examples/UnionExample.java    |  40 +--
 .../tez/mapreduce/examples/WordCount.java       |  31 +--
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  51 ----
 ...TestOrderedPartitionedEdgeConfiguration.java | 266 -------------------
 ...stOrderedPartitionedKVEdgeConfiguration.java | 266 +++++++++++++++++++
 ...stUnorderedPartitionedEdgeConfiguration.java | 205 --------------
 ...UnorderedPartitionedKVEdgeConfiguration.java | 205 ++++++++++++++
 ...UnorderedUnpartitionedEdgeConfiguration.java | 175 ------------
 ...orderedUnpartitionedKVEdgeConfiguration.java | 175 ++++++++++++
 15 files changed, 716 insertions(+), 918 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e3cdf71..816540d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -36,30 +36,22 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfiguration;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
@@ -127,21 +119,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
 
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
-    Configuration kvInputConf = new JobConf((Configuration)tezConf);
-    kvInputConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    kvInputConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(kvInputConf,
-        null);
 
-    Configuration kvOneToOneConf = new JobConf((Configuration)tezConf);
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(kvOneToOneConf,
-        kvInputConf);
-
-    MRHelpers.doJobClientMagic(kvInputConf);
-    MRHelpers.doJobClientMagic(kvOneToOneConf);
+    JobConf mrConf = new JobConf(tezConf);
 
     int numBroadcastTasks = 2;
     int numOneToOneTasks = 3;
@@ -161,43 +140,33 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
 
     System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
 
-    byte[] kvInputPayload = MRHelpers.createUserPayloadFromConf(kvInputConf);
     Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
         InputProcessor.class.getName()),
-        numBroadcastTasks, MRHelpers.getMapResource(kvInputConf));
+        numBroadcastTasks, MRHelpers.getMapResource(mrConf));
     
     Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
         InputProcessor.class.getName()).setUserPayload(procPayload),
-        numOneToOneTasks, MRHelpers.getMapResource(kvInputConf));
-    
-    byte[] kvOneToOnePayload = MRHelpers.createUserPayloadFromConf(kvOneToOneConf);
+        numOneToOneTasks, MRHelpers.getMapResource(mrConf));
+
     Vertex oneToOneVertex = new Vertex("OneToOne",
         new ProcessorDescriptor(
             OneToOneProcessor.class.getName()).setUserPayload(procPayload),
-            -1, MRHelpers.getReduceResource(kvOneToOneConf));
+            -1, MRHelpers.getReduceResource(mrConf));
     oneToOneVertex.setVertexManagerPlugin(
             new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
-    
+
+    UnorderedUnpartitionedKVEdgeConfiguration edgeConf = UnorderedUnpartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
+
     DAG dag = new DAG("BroadcastAndOneToOneExample");
     dag.addVertex(inputVertex)
         .addVertex(broadcastVertex)
         .addVertex(oneToOneVertex)
         .addEdge(
-            new Edge(inputVertex, oneToOneVertex, new EdgeProperty(
-                DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
-                new OutputDescriptor(OnFileUnorderedKVOutput.class.getName())
-                        .setUserPayload(kvInputPayload), 
-                new InputDescriptor(ShuffledUnorderedKVInput.class.getName())
-                        .setUserPayload(kvOneToOnePayload))))
+            new Edge(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
         .addEdge(
-            new Edge(broadcastVertex, oneToOneVertex, new EdgeProperty(
-                DataMovementType.BROADCAST, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
-                new OutputDescriptor(OnFileUnorderedKVOutput.class.getName())
-                        .setUserPayload(kvInputPayload), 
-                new InputDescriptor(ShuffledUnorderedKVInput.class.getName())
-                        .setUserPayload(kvOneToOnePayload))));
+            new Edge(broadcastVertex, oneToOneVertex,
+                edgeConf.createDefaultBroadcastEdgeProperty()));
     return dag;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 4acf161..34d1e11 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,14 +52,9 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -75,14 +70,12 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.processor.FilterByWordInputProcessor;
 import org.apache.tez.processor.FilterByWordOutputProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfiguration;
 
 import com.google.common.collect.Sets;
 
@@ -171,8 +164,6 @@ public class FilterLinesByWord extends Configured implements Tool {
     Configuration stage1Conf = new JobConf(conf);
     stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
     stage1Conf.setBoolean("mapred.mapper.new-api", false);
-    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
-    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
     stage1Conf.set(FILTER_PARAM_NAME, filterWord);
 
     InputSplitInfo inputSplitInfo = null;
@@ -182,19 +173,10 @@ public class FilterLinesByWord extends Configured implements Tool {
         credentials.addAll(inputSplitInfo.getCredentials());
       }
     }
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
-
-
 
     Configuration stage2Conf = new JobConf(conf);
-    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
-    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
-
-    MRHelpers.doJobClientMagic(stage1Conf);
-    MRHelpers.doJobClientMagic(stage2Conf);
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
@@ -232,12 +214,11 @@ public class FilterLinesByWord extends Configured implements Tool {
         .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
     stage2Vertex.addOutput("MROutput", od, MROutputCommitter.class);
 
+    UnorderedUnpartitionedKVEdgeConfiguration edgeConf = UnorderedUnpartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
+
     DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
-        DataMovementType.BROADCAST, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-            OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
-            ShuffledUnorderedKVInput.class.getName())));
+    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultBroadcastEdgeProperty());
     dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
 
     LOG.info("Submitting DAG to Tez Session");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 4620680..23fa0e0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -45,13 +45,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -67,14 +62,12 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.processor.FilterByWordInputProcessor;
 import org.apache.tez.processor.FilterByWordOutputProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfiguration;
 
 public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
@@ -159,27 +152,17 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     Configuration stage1Conf = new JobConf(conf);
     stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
     stage1Conf.setBoolean("mapred.mapper.new-api", false);
-    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
-    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
     stage1Conf.set(FILTER_PARAM_NAME, filterWord);
 
     InputSplitInfo inputSplitInfo = null;
     if (generateSplitsInClient) {
       inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
     }
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
-
-
 
     Configuration stage2Conf = new JobConf(conf);
-    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
-    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
+
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
-
-    MRHelpers.doJobClientMagic(stage1Conf);
-    MRHelpers.doJobClientMagic(stage2Conf);
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
@@ -218,12 +201,11 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
             .createUserPayloadFromConf(stage2Conf)),
         MROutputCommitter.class);
 
+    UnorderedUnpartitionedKVEdgeConfiguration edgeConf = UnorderedUnpartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
+
     DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
-        DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-            OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
-            ShuffledUnorderedKVInput.class.getName())));
+    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultOneToOneEdgeProperty());
     dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
 
     LOG.info("Submitting DAG to Tez Session");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index dab39a5..79dc6d2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -38,17 +38,10 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -63,8 +56,7 @@ import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
@@ -212,21 +204,9 @@ public class IntersectValidate extends Configured implements Tool {
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. Fix after there's a
     // better mechanism to configure the IOs.
-    Configuration intermediateOutputConf = new Configuration(tezConf);
-    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
-        NullWritable.class.getName());
-    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
-        HashPartitioner.class.getName());
-    byte[] intermediateOutputPayload = TezUtils.createUserPayloadFromConf(intermediateOutputConf);
-
-    Configuration intermediateInputConf = new Configuration(tezConf);
-    intermediateInputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-        Text.class.getName());
-    intermediateInputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
-        NullWritable.class.getName());
-    byte[] intermediateInputPayload = TezUtils.createUserPayloadFromConf(intermediateInputConf);
+    OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), NullWritable.class.getName()).configureOutput(
+            HashPartitioner.class.getName(), null).done().build();
 
     // Change the way resources are setup - no MRHelpers
     Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
@@ -245,17 +225,8 @@ public class IntersectValidate extends Configured implements Tool {
         new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
         numPartitions, MRHelpers.getReduceResource(tezConf));
 
-    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(OnFileSortedOutput.class.getName())
-            .setUserPayload(intermediateOutputPayload), new InputDescriptor(
-            ShuffledMergedInput.class.getName()).setUserPayload(intermediateInputPayload)));
-
-    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(OnFileSortedOutput.class.getName())
-            .setUserPayload(intermediateOutputPayload), new InputDescriptor(
-            ShuffledMergedInput.class.getName()).setUserPayload(intermediateInputPayload)));
+    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
 
     dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
         .addEdge(e2);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 8971979..e0bcca2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -65,16 +65,10 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
@@ -83,13 +77,13 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -439,14 +433,8 @@ public class MRRSleepJob extends Configured implements Tool {
     mapStageConf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
     mapStageConf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        IntWritable.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
     mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
         SleepInputFormat.class.getName());
-    mapStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
-        MRRSleepJobPartitioner.class.getName());
     if (numIReducer == 0 && numReducer == 0) {
       mapStageConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
           NullOutputFormat.class.getName());
@@ -641,17 +629,18 @@ public class MRRSleepJob extends Configured implements Tool {
       MRHelpers.addMROutputLegacy(mapVertex, mapUserPayload);
     }
 
+
+    Configuration partitionerConf = new Configuration(false);
+    partitionerConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName());
+    OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder(IntWritable.class.getName(), IntWritable.class.getName()).configureOutput(
+            MRPartitioner.class.getName(), partitionerConf).done().build();
+
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
         dag.addEdge(new Edge(vertices.get(i-1),
-            vertices.get(i), new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
-                new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
-                new InputDescriptor(
-                    ShuffledMergedInputLegacy.class.getName()))));
+            vertices.get(i), edgeConf.createDefaultEdgeProperty()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 672d685..995a1ba 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -58,12 +58,6 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -81,8 +75,8 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 
 /**
@@ -158,10 +152,6 @@ public class OrderedWordCount extends Configured implements Tool {
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
         TokenizerMapper.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
     if (generateSplitsInClient) {
       mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
           TextInputFormat.class.getName());
@@ -198,10 +188,6 @@ public class OrderedWordCount extends Configured implements Tool {
     finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
     finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
         MyOrderByNoOpReducer.class.getName());
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
     finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
         TextOutputFormat.class.getName());
     finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
@@ -257,18 +243,16 @@ public class OrderedWordCount extends Configured implements Tool {
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);
 
+    OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder(IntWritable.class.getName(), Text.class.getName()).configureOutput(
+            HashPartitioner.class.getName(), null).done().build();
+
     DAG dag = new DAG("OrderedWordCount" + dagIndex);
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
-        dag.addEdge(new Edge(vertices.get(i-1),
-            vertices.get(i), new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL,
-                new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
-                new InputDescriptor(
-                    ShuffledMergedInputLegacy.class.getName()))));
+        dag.addEdge(new Edge(vertices.get(i - 1),
+            vertices.get(i), edgeConf.createDefaultEdgeProperty()));
       }
     }
     return dag;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index ff082b8..8802cee 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -41,16 +41,12 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -61,12 +57,13 @@ import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 import com.google.common.base.Preconditions;
@@ -89,8 +86,8 @@ public class UnionExample {
       Preconditions.checkArgument(getOutputs().containsKey("checker"));
       MRInput input = (MRInput) getInputs().values().iterator().next();
       KeyValueReader kvReader = input.getReader();
-      OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().get("checker");
-      KeyValueWriter kvWriter = output.getWriter();
+      Output output =  getOutputs().get("checker");
+      KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       MROutput parts = null;
       KeyValueWriter partsWriter = null;
       if (inUnion) {
@@ -222,34 +219,19 @@ public class UnionExample {
       .setUserPayload(MROutput.createUserPayload(
           partsConf, TextOutputFormat.class.getName(), true));
     unionVertex.addOutput("parts", od1, MROutputCommitter.class);
-    
-    byte[] intermediateDataPayloadIn = 
-        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
-            IntWritable.class.getName(), true, null, null);
-    byte[] intermediateDataPayloadOut = 
-        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
-            IntWritable.class.getName(), true, null, null);
+
+    OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), IntWritable.class.getName()).configureOutput(
+            HashPartitioner.class.getName(), null).done().build();
 
     dag.addVertex(mapVertex1)
         .addVertex(mapVertex2)
         .addVertex(mapVertex3)
         .addVertex(checkerVertex)
         .addEdge(
-            new Edge(mapVertex3, checkerVertex, new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
-                new OutputDescriptor(OnFileSortedOutput.class.getName())
-                        .setUserPayload(intermediateDataPayloadIn), 
-                new InputDescriptor(ShuffledMergedInput.class.getName())
-                        .setUserPayload(intermediateDataPayloadOut))))
+            new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
         .addEdge(
-            new GroupInputEdge(unionVertex, checkerVertex, new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL,
-                new OutputDescriptor(OnFileSortedOutput.class.getName())
-                    .setUserPayload(intermediateDataPayloadIn), 
-                new InputDescriptor(ShuffledMergedInput.class.getName())
-                    .setUserPayload(intermediateDataPayloadOut)),
+            new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
                 new InputDescriptor(
                     ConcatenatedMergedKeyValuesInput.class.getName())));
     return dag;  

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index b34357c..088c624 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -41,15 +41,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
@@ -58,13 +54,14 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration;
 
 import com.google.common.base.Preconditions;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 
 public class WordCount extends Configured implements Tool {
@@ -78,7 +75,7 @@ public class WordCount extends Configured implements Tool {
       Preconditions.checkArgument(getOutputs().size() == 1);
       MRInput input = (MRInput) getInputs().values().iterator().next();
       KeyValueReader kvReader = input.getReader();
-      OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().values().iterator().next();
+      Output output = getOutputs().values().iterator().next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       while (kvReader.next()) {
         StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
@@ -125,11 +122,7 @@ public class WordCount extends Configured implements Tool {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           outputConf, TextOutputFormat.class.getName(), true));
-    
-    byte[] intermediateDataPayload = 
-        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
-            IntWritable.class.getName(), true, null, null);
-    
+
     Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
         TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
     tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);
@@ -138,18 +131,16 @@ public class WordCount extends Configured implements Tool {
         new ProcessorDescriptor(
             SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
     summerVertex.addOutput("MROutput", od, MROutputCommitter.class);
-    
+
+    OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder(Text.class.getName(), IntWritable.class.getName()).configureOutput(
+            HashPartitioner.class.getName(), null).done().build();
+
     DAG dag = new DAG("WordCount");
     dag.addVertex(tokenizerVertex)
         .addVertex(summerVertex)
         .addEdge(
-            new Edge(tokenizerVertex, summerVertex, new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
-                new OutputDescriptor(OnFileSortedOutput.class.getName())
-                        .setUserPayload(intermediateDataPayload), 
-                new InputDescriptor(ShuffledMergedInput.class.getName())
-                        .setUserPayload(intermediateDataPayload))));
+            new Edge(tokenizerVertex, summerVertex, edgeConf.createDefaultEdgeProperty()));
     return dag;  
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 6355430..7f30e7f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -148,57 +148,6 @@ public class MRHelpers {
     }
   }
 
-  /**
-   * Create the user payload to be set on intermediate edge Input/Output classes
-   * that use MapReduce Key-Value data types. If the input and output have
-   * different configurations then this method may be called separately for both
-   * to get different payloads. If the input and output have no special
-   * configuration then this method may be called once to get the common payload
-   * for both input and output.
-   * 
-   * @param conf
-   *          Configuration for the class
-   * @param keyClassName
-   *          Class name of the Key
-   * @param valueClassName
-   *          Class name of the Value
-   * @param useNewApi
-   *          use new mapreduce API or old mapred API
-   * @param keyComparatorClassName
-   *          Optional key comparator class name
-   * @param compressionCodecClassName
-   *          Optional compression codec
-   * @return
-   * @throws IOException
-   */
-  public static byte[] createMRIntermediateDataPayload(Configuration conf,
-      String keyClassName, String valueClassName, boolean useNewApi,
-      @Nullable String keyComparatorClassName,
-      @Nullable String compressionCodecClassName) throws IOException {
-    Preconditions.checkNotNull(conf);
-    Preconditions.checkNotNull(keyClassName);
-    Preconditions.checkNotNull(valueClassName);
-    Configuration intermediateDataConf = new JobConf(conf);
-    intermediateDataConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, keyClassName);
-    intermediateDataConf
-        .set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, valueClassName);
-    if (keyComparatorClassName != null) {
-      intermediateDataConf.set(MRJobConfig.KEY_COMPARATOR,
-          keyComparatorClassName);
-    }
-    if (compressionCodecClassName != null) {
-      intermediateDataConf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
-      intermediateDataConf.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
-          compressionCodecClassName);
-    }
-    intermediateDataConf.setBoolean("mapred.mapper.new-api", useNewApi);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(
-        intermediateDataConf, intermediateDataConf);
-    MRHelpers.doJobClientMagic(intermediateDataConf);
-
-    return TezUtils.createUserPayloadFromConf(intermediateDataConf);
-  }
-
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Private
   public static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
deleted file mode 100644
index 79036d3..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.junit.Test;
-
-public class TestOrderedPartitionedEdgeConfiguration {
-
-  @Test
-  public void testIncompleteParameters() {
-    OrderedPartitionedKVEdgeConfiguration.Builder builder =
-        OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
-    try {
-      builder.build();
-      fail("Should have failed since the partitioner has not been specified");
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains("Output must be configured - partitioner"));
-    }
-  }
-
-  @Test
-  public void testNullParams() {
-    try {
-      OrderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null, null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-  }
-
-  @Test
-  public void testDefaultConfigsUsed() {
-    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
-        .newBuilder("KEY", "VALUE")
-        .configureOutput("PARTITIONER", null).done();
-
-    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
-  }
-
-  @Test
-  public void testSpecificIOConfs() {
-    // Ensures that Output and Input confs are not mixed.
-    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
-        .newBuilder("KEY", "VALUE")
-        .configureOutput("PARTITIONER", null).done();
-
-    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals("DEFAULT",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals("DEFAULT",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
-  }
-
-  @Test
-  public void tetCommonConf() {
-
-    Configuration fromConf = new Configuration(false);
-    fromConf.set("test.conf.key.1", "confkey1");
-    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 3);
-    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
-    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 123);
-    fromConf.set("io.shouldExist", "io");
-    Map<String, String> additionalConfs = new HashMap<String, String>();
-    additionalConfs.put("test.key.2", "key2");
-    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
-    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
-    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
-    additionalConfs.put("file.shouldExist", "file");
-
-    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
-        .newBuilder("KEY", "VALUE")
-        .configureOutput("PARTITIONER", null).done()
-        .setAdditionalConfiguration("fs.shouldExist", "fs")
-        .setAdditionalConfiguration("test.key.1", "key1")
-        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "2222")
-        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
-        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, "3333")
-        .setAdditionalConfiguration(additionalConfs)
-        .setFromConfiguration(fromConf);
-
-    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    Configuration inputConf = rebuiltInput.conf;
-
-    assertEquals(3, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
-    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(2222, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
-    assertEquals(123, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
-    assertEquals("CustomSorter", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
-    assertEquals(3333,
-        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 0));
-    assertEquals("io", outputConf.get("io.shouldExist"));
-    assertEquals("file", outputConf.get("file.shouldExist"));
-    assertEquals("fs", outputConf.get("fs.shouldExist"));
-
-
-    assertEquals(3, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
-    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(2222, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertEquals(0.11f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.22f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
-        0.001f);
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES));
-    assertEquals("io", inputConf.get("io.shouldExist"));
-    assertEquals("file", inputConf.get("file.shouldExist"));
-    assertEquals("fs", inputConf.get("fs.shouldExist"));
-
-  }
-
-  @Test
-  public void testSetters() {
-    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
-        .newBuilder("KEY", "VALUE")
-        .setKeyComparatorClass("KEY_COMPARATOR")
-        .configureOutput("PARTITIONER", null).setSortBufferSize(1111).setSorterNumThreads(2).done()
-        .configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f)
-        .setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done()
-        .enableCompression("CustomCodec");
-
-    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    Configuration inputConf = rebuiltInput.conf;
-
-    assertEquals("KEY", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
-    assertEquals("VALUE",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
-    assertEquals("PARTITIONER", outputConf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
-    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
-    assertEquals("CustomCodec",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
-    assertEquals(true,
-        outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
-            false));
-    assertEquals("KEY_COMPARATOR",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
-
-
-    assertEquals("KEY", inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
-    assertEquals("VALUE",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, ""));
-    assertEquals("CustomCodec",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
-    assertEquals(true,
-        inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
-            false));
-    assertEquals(0.11f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.22f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
-        0.001f);
-    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f),
-        0.001f);
-    assertEquals(0.44f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f), 0.001f);
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfiguration.java
new file mode 100644
index 0000000..21ea50a
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfiguration.java
@@ -0,0 +1,266 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestOrderedPartitionedKVEdgeConfiguration {
+
+  @Test
+  public void testIncompleteParameters() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder =
+        OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
+    try {
+      builder.build();
+      fail("Should have failed since the partitioner has not been specified");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Output must be configured - partitioner"));
+    }
+  }
+
+  @Test
+  public void testNullParams() {
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null, null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done();
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+  }
+
+  @Test
+  public void testSpecificIOConfs() {
+    // Ensures that Output and Input confs are not mixed.
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done();
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals("DEFAULT",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals("DEFAULT",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+  }
+
+  @Test
+  public void tetCommonConf() {
+
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 3);
+    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 123);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConfs = new HashMap<String, String>();
+    additionalConfs.put("test.key.2", "key2");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
+    additionalConfs.put("file.shouldExist", "file");
+
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done()
+        .setAdditionalConfiguration("fs.shouldExist", "fs")
+        .setAdditionalConfiguration("test.key.1", "key1")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "2222")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, "3333")
+        .setAdditionalConfiguration(additionalConfs)
+        .setFromConfiguration(fromConf);
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals(3, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(2222, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertEquals(123, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
+    assertEquals("CustomSorter", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
+    assertEquals(3333,
+        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 0));
+    assertEquals("io", outputConf.get("io.shouldExist"));
+    assertEquals("file", outputConf.get("file.shouldExist"));
+    assertEquals("fs", outputConf.get("fs.shouldExist"));
+
+
+    assertEquals(3, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
+    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(2222, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
+    assertEquals(0.22f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
+        0.001f);
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES));
+    assertEquals("io", inputConf.get("io.shouldExist"));
+    assertEquals("file", inputConf.get("file.shouldExist"));
+    assertEquals("fs", inputConf.get("fs.shouldExist"));
+
+  }
+
+  @Test
+  public void testSetters() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .setKeyComparatorClass("KEY_COMPARATOR")
+        .configureOutput("PARTITIONER", null).setSortBufferSize(1111).setSorterNumThreads(2).done()
+        .configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f)
+        .setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done()
+        .enableCompression("CustomCodec");
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals("KEY", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", outputConf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
+    assertEquals("CustomCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+            false));
+    assertEquals("KEY_COMPARATOR",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+
+
+    assertEquals("KEY", inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
+    assertEquals("VALUE",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, ""));
+    assertEquals("CustomCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
+            false));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
+    assertEquals(0.22f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
+        0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f),
+        0.001f);
+    assertEquals(0.44f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f), 0.001f);
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7296793c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
deleted file mode 100644
index 23acf04..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.junit.Test;
-
-public class TestUnorderedPartitionedEdgeConfiguration {
-  @Test
-  public void testIncompleteParameters() {
-    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
-        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
-    try {
-      builder.build();
-      fail("Should have failed since the partitioner has not been specified");
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains("Output must be configured - partitioner"));
-    }
-  }
-
-  @Test
-  public void testNullParams() {
-    try {
-      UnorderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null, null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-  }
-
-  @Test
-  public void testDefaultConfigsUsed() {
-    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
-        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE")
-            .configureOutput("PARTITIONER", null).done();
-
-    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
-        new OnFileUnorderedPartitionedKVOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledUnorderedKVInputConfiguration rebuiltInput =
-        new ShuffledUnorderedKVInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
-  }
-
-  @Test
-  public void testSpecificIOConfs() {
-    // Ensures that Output and Input confs are not mixed.
-    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
-        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE")
-            .configureOutput("PARTITIONER", null).done();
-
-    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
-        new OnFileUnorderedPartitionedKVOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledUnorderedKVInputConfiguration rebuiltInput =
-        new ShuffledUnorderedKVInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals("DEFAULT",
-        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals("DEFAULT",
-        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
-  }
-
-  @Test
-  public void tetCommonConf() {
-
-    Configuration fromConf = new Configuration(false);
-    fromConf.set("test.conf.key.1", "confkey1");
-    fromConf.setBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, false);
-    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
-    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 123);
-    fromConf.set("io.shouldExist", "io");
-    Map<String, String> additionalConfs = new HashMap<String, String>();
-    additionalConfs.put("test.key.2", "key2");
-    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
-    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
-    additionalConfs
-        .put(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
-    additionalConfs.put("file.shouldExist", "file");
-
-    UnorderedPartitionedKVEdgeConfiguration.Builder builder = UnorderedPartitionedKVEdgeConfiguration
-        .newBuilder("KEY",
-            "VALUE")
-        .configureOutput("PARTITIONER", null).done()
-        .setAdditionalConfiguration("fs.shouldExist", "fs")
-        .setAdditionalConfiguration("test.key.1", "key1")
-        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
-        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
-        .setAdditionalConfiguration(additionalConfs)
-        .setFromConfiguration(fromConf);
-
-    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
-
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
-
-    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
-        new OnFileUnorderedPartitionedKVOutputConfiguration();
-    rebuiltOutput.fromByteArray(outputBytes);
-    ShuffledUnorderedKVInputConfiguration rebuiltInput =
-        new ShuffledUnorderedKVInputConfiguration();
-    rebuiltInput.fromByteArray(inputBytes);
-
-    Configuration outputConf = rebuiltOutput.conf;
-    Configuration inputConf = rebuiltInput.conf;
-
-    assertEquals(false, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
-    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
-    assertEquals(123,
-        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
-    assertEquals(2222,
-        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
-    assertEquals("io", outputConf.get("io.shouldExist"));
-    assertEquals("file", outputConf.get("file.shouldExist"));
-    assertEquals("fs", outputConf.get("fs.shouldExist"));
-
-
-    assertEquals(false, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertEquals(0.11f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.22f,
-        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
-        0.001f);
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB));
-    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES));
-    assertEquals("io", inputConf.get("io.shouldExist"));
-    assertEquals("file", inputConf.get("file.shouldExist"));
-    assertEquals("fs", inputConf.get("fs.shouldExist"));
-
-  }
-}


Mime
View raw message