tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of the input and output (bikas)
Date Thu, 24 Jul 2014 22:35:11 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2e52635d9 -> 74d04a48a


http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 8071e52..7becaad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -68,8 +68,10 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -436,9 +438,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                    .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
                     .setName("input1")
-                    .setEntityDescriptor(
+                    .setIODescriptor(
                         TezEntityDescriptorProto.newBuilder()
                             .setClassName("InputClazz")
                             .build()
@@ -462,9 +466,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                        .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
                         .setName("input2")
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
                               .setClassName("InputClazz")
                               .build()
@@ -489,9 +495,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                        .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
                         .setName("input3")
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
                               .setClassName("InputClazz")
                               .build()
@@ -518,9 +526,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                        .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
                         .setName("input4")
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
                               .setClassName("InputClazz")
                               .build()
@@ -583,9 +593,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                        .setInitializerClassName("IrrelevantInitializerClassName")
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                "IrrelevantInitializerClassName"))
                         .setName("input1")
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
                                 .setClassName("InputClazz")
                                 .build()
@@ -630,9 +642,11 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                        .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
                         .setName("input1")
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
                               .setClassName("InputClazz")
                               .build()
@@ -697,9 +711,11 @@ public class TestVertexImpl {
       numTasks = -1;
       v1Builder.addInputs(
           RootInputLeafOutputProto.newBuilder()
-          .setInitializerClassName(initializerClassName)
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
           .setName("input1")
-          .setEntityDescriptor(
+          .setIODescriptor(
               TezEntityDescriptorProto.newBuilder()
                   .setClassName("InputClazz")
                   .build()
@@ -1026,11 +1042,13 @@ public class TestVertexImpl {
                 )
                 .addOutputs(
                     DAGProtos.RootInputLeafOutputProto.newBuilder()
-                        .setEntityDescriptor(
+                        .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder().setClassName("output").build()
                         )
                         .setName("outputx")
-                        .setInitializerClassName(CountingOutputCommitter.class.getName())
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                CountingOutputCommitter.class.getName()))
                 )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
@@ -2340,13 +2358,14 @@ public class TestVertexImpl {
     List<RootInputLeafOutputProto> outputs =
         new ArrayList<RootInputLeafOutputProto>();
     outputs.add(RootInputLeafOutputProto.newBuilder()
-        .setInitializerClassName(CountingOutputCommitter.class.getName())
-        .setName("output_v2")
-        .setEntityDescriptor(
-            TezEntityDescriptorProto.newBuilder()
-                .setUserPayload(ByteString.copyFrom(
+        .setControllerDescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName(
+                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
                     new CountingOutputCommitter.CountingOutputCommitterConfig()
-                        .toUserPayload())).build())
+                    .toUserPayload())).build())
+        .setName("output_v2")
+        .setIODescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
         .build());
     v.setAdditionalOutputs(outputs);
 
@@ -2452,13 +2471,14 @@ public class TestVertexImpl {
     List<RootInputLeafOutputProto> outputs =
         new ArrayList<RootInputLeafOutputProto>();
     outputs.add(RootInputLeafOutputProto.newBuilder()
-        .setInitializerClassName(CountingOutputCommitter.class.getName())
-        .setName("output_v2")
-        .setEntityDescriptor(
-            TezEntityDescriptorProto.newBuilder()
-                .setUserPayload(ByteString.copyFrom(
+        .setControllerDescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName(
+                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
                     new CountingOutputCommitter.CountingOutputCommitterConfig(
                         true, true, false).toUserPayload())).build())
+        .setName("output_v2")
+        .setIODescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
         .build());
     v.setAdditionalOutputs(outputs);
 
@@ -2495,13 +2515,14 @@ public class TestVertexImpl {
     List<RootInputLeafOutputProto> outputs =
         new ArrayList<RootInputLeafOutputProto>();
     outputs.add(RootInputLeafOutputProto.newBuilder()
-        .setInitializerClassName(CountingOutputCommitter.class.getName())
-        .setName("output_v2")
-        .setEntityDescriptor(
-            TezEntityDescriptorProto.newBuilder()
-                .setUserPayload(ByteString.copyFrom(
+        .setControllerDescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName(
+                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
                     new CountingOutputCommitter.CountingOutputCommitterConfig(
                         true, true, true).toUserPayload())).build())
+        .setName("output_v2")
+        .setIODescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
         .build());
     v.setAdditionalOutputs(outputs);
 
@@ -2793,6 +2814,7 @@ public class TestVertexImpl {
     Assert.assertEquals(true, initializerManager2.hasShutDown);
   }
 
+  @SuppressWarnings("unchecked")
   @Test(timeout = 10000)
   public void testRootInputInitializerEvent() throws Exception {
     useCustomInitializer = true;
@@ -2814,7 +2836,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     dispatcher.await();
 
-    RootInputInitializerManagerWithRunningInitializer manager2 = v2.getRootInputInitializerManager();
     // Wait for the initializer to be invoked - which may be a separate thread.
     while (!initializer.initStarted.get()) {
       Thread.sleep(10);
@@ -3052,6 +3073,7 @@ public class TestVertexImpl {
     }
   }
 
+  @SuppressWarnings("rawtypes")
   private static class VertexImplWithRunningInputInitializer extends VertexImpl {
 
     private RootInputInitializerManagerWithRunningInitializer rootInputInitializerManager;
@@ -3087,10 +3109,6 @@ public class TestVertexImpl {
       }
       return rootInputInitializerManager;
     }
-
-    RootInputInitializerManagerWithRunningInitializer getRootInputInitializerManager() {
-      return rootInputInitializerManager;
-    }
   }
 
   @SuppressWarnings("rawtypes")
@@ -3150,7 +3168,7 @@ public class TestVertexImpl {
 
     @Override
     protected TezRootInputInitializer createInitializer(
-        RootInputLeafOutputDescriptor<InputDescriptor> input) {
+        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) {
       return presetInitializer;
     }
   }
@@ -3159,7 +3177,7 @@ public class TestVertexImpl {
   private static class RootInputInitializerManagerControlled extends
       RootInputInitializerManager {
 
-    private List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs;
+    private List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs;
     private final EventHandler eventHandler;
     private final DrainDispatcher dispatcher;
     private final TezVertexID vertexID;
@@ -3177,13 +3195,13 @@ public class TestVertexImpl {
 
     @Override
     public void runInputInitializers(
-        List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
+        List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
       this.inputs = inputs;
     }
 
     @Override
     protected TezRootInputInitializer createInitializer(
-        RootInputLeafOutputDescriptor<InputDescriptor> input) {
+        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) {
 
       return new TezRootInputInitializer() {
         @Override
@@ -3207,14 +3225,14 @@ public class TestVertexImpl {
     public void failInputInitialization() {
       super.runInputInitializers(inputs);
       eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
-          .get(0).getEntityName(),
+          .get(0).getName(),
           new RuntimeException("MockInitializerFailed")));
       dispatcher.await();
     }
 
     public void completeInputInitialization() {
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs.get(0)
-          .getEntityName(), null));
+          .getName(), null));
       dispatcher.await();
     }
 
@@ -3223,7 +3241,7 @@ public class TestVertexImpl {
       RootInputUpdatePayloadEvent event = new RootInputUpdatePayloadEvent(payload);
       events.add(event);
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
-          .get(0).getEntityName(), events));
+          .get(0).getName(), events));
       dispatcher.await();
     }
 
@@ -3240,7 +3258,7 @@ public class TestVertexImpl {
         events.add(diEvent);
       }
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
-          .get(initializerIndex).getEntityName(), events));
+          .get(initializerIndex).getName(), events));
       dispatcher.await();
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index f926471..0b7b395 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -33,12 +33,12 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -68,8 +68,9 @@ public class TestDAGUtils {
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor("output.class")
         .setHistoryText("uvOut HistoryText");
-    uv12.addOutput("uvOut", outDesc, OutputCommitter.class);
-    v3.addOutput("uvOut", outDesc, OutputCommitter.class);
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName());
+    uv12.addOutput("uvOut", outDesc, ocd);
+    v3.addOutput("uvOut", outDesc, ocd);
 
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER,
@@ -86,6 +87,7 @@ public class TestDAGUtils {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
     DAGPlan dagPlan = createDAG();
     Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 78f8a0f..3a26171 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -56,6 +56,8 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -200,7 +202,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     stage1Vertex.addInput("MRInput",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
-        initializerClazz);
+        (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -212,7 +214,8 @@ public class FilterLinesByWord extends Configured implements Tool {
     // Configure the Output for stage2
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
         .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
-    stage2Vertex.addOutput("MROutput", od, MROutputCommitter.class);
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
+    stage2Vertex.addOutput("MROutput", od, ocd);
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 1289b58..53eb590 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -48,6 +48,8 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -186,7 +188,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage1Vertex.addInput("MRInput",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
-        initializerClazz);
+            (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -199,7 +201,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage2Vertex.addOutput("MROutput",
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
             .createUserPayloadFromConf(stage2Conf)),
-        MROutputCommitter.class);
+            new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index e5ab546..58b952b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -214,13 +215,13 @@ public class IntersectDataGen extends Configured implements Tool {
         largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
     genDataVertex.addOutput(STREAM_OUTPUT_NAME,
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
-        MROutputCommitter.class);
+        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
     genDataVertex.addOutput(HASH_OUTPUT_NAME,
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
-        MROutputCommitter.class);
+        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
     genDataVertex.addOutput(EXPECTED_OUTPUT_NAME,
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
-        MROutputCommitter.class);
+        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
 
     dag.addVertex(genDataVertex);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 6c76354..1353080 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -45,6 +45,8 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -212,19 +214,22 @@ public class IntersectExample extends Configured implements Tool {
         new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
         MRHelpers.getMapResource(tezConf)).addInput("streamfile",
         new InputDescriptor(MRInput.class.getName())
-            .setUserPayload(streamInputPayload), MRInputAMSplitGenerator.class);
+            .setUserPayload(streamInputPayload), 
+            new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
         MRHelpers.getMapResource(tezConf)).addInput("hashfile",
         new InputDescriptor(MRInput.class.getName())
-            .setUserPayload(hashInputPayload), MRInputAMSplitGenerator.class);
+            .setUserPayload(hashInputPayload), 
+            new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
         IntersectProcessor.class.getName()), numPartitions,
         MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput",
         new OutputDescriptor(MROutput.class.getName())
-            .setUserPayload(finalOutputPayload), MROutputCommitter.class);
+            .setUserPayload(finalOutputPayload), 
+        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
 
     Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 3a74429..0b91efb 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -42,6 +42,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -213,13 +214,13 @@ public class IntersectValidate extends Configured implements Tool {
         ForwardingProcessor.class.getName()), -1,
         MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor(
         MRInput.class.getName()).setUserPayload(streamInputPayload),
-        MRInputAMSplitGenerator.class);
+        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
         MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor(
         MRInput.class.getName()).setUserPayload(hashInputPayload),
-        MRInputAMSplitGenerator.class);
+        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex intersectValidateVertex = new Vertex("intersectvalidate",
         new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 015e8e3..14fe441 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +64,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -575,12 +575,14 @@ public class MRRSleepJob extends Configured implements Tool {
     }
 
     if (generateSplitsInAM) {
-      MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputAMSplitGenerator.class);
+      MRHelpers.addMRInput(mapVertex, mapInputPayload, 
+          new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
     } else {
       if (writeSplitsToDFS) {
         MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
       } else {
-        MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class);
+        MRHelpers.addMRInput(mapVertex, mapInputPayload, 
+            new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()));
       }
     }
     vertices.add(mapVertex);

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 051bfee..f66e60f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -59,6 +59,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -224,7 +225,9 @@ public class OrderedWordCount extends Configured implements Tool {
 
     Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
-    MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
+    MRHelpers.addMRInput(mapVertex, mapInputPayload, 
+        (initializerClazz==null) ? null : 
+          new InputInitializerDescriptor(initializerClazz.getName()));
     vertices.add(mapVertex);
 
     ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 74bf570..e2f073f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
@@ -180,17 +182,19 @@ public class UnionExample {
     Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex1.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+    InputInitializerDescriptor iid = 
+        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName());
+    mapVertex1.addInput("MRInput", id, iid);
 
     Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex2.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+    mapVertex2.addInput("MRInput", id, iid);
 
     Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex3.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+    mapVertex3.addInput("MRInput", id, iid);
 
     Vertex checkerVertex = new Vertex("checker",
         new ProcessorDescriptor(
@@ -202,14 +206,15 @@ public class UnionExample {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           outputConf, TextOutputFormat.class.getName(), true));
-    checkerVertex.addOutput("union", od, MROutputCommitter.class);
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
+    checkerVertex.addOutput("union", od, ocd);
 
     Configuration allPartsConf = new Configuration(tezConf);
     allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
     OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           allPartsConf, TextOutputFormat.class.getName(), true));
-    checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
+    checkerVertex.addOutput("all-parts", od2, ocd);
 
     Configuration partsConf = new Configuration(tezConf);
     partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
@@ -218,7 +223,7 @@ public class UnionExample {
     OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           partsConf, TextOutputFormat.class.getName(), true));
-    unionVertex.addOutput("parts", od1, MROutputCommitter.class);
+    unionVertex.addOutput("parts", od1, ocd);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index d5e6154..61f5cd9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -42,6 +42,8 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -61,6 +63,7 @@ import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 
@@ -116,21 +119,24 @@ public class WordCount extends Configured implements Tool {
     InputDescriptor id = new InputDescriptor(MRInput.class.getName())
         .setUserPayload(MRInput.createUserPayload(inputConf,
             TextInputFormat.class.getName(), true, true));
+    InputInitializerDescriptor iid = new InputInitializerDescriptor(
+        MRInputAMSplitGenerator.class.getName());
 
     Configuration outputConf = new Configuration(tezConf);
     outputConf.set(FileOutputFormat.OUTDIR, outputPath);
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           outputConf, TextOutputFormat.class.getName(), true));
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
 
     Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
         TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
-    tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+    tokenizerVertex.addInput("MRInput", id, iid);
 
     Vertex summerVertex = new Vertex("summer",
         new ProcessorDescriptor(
             SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
-    summerVertex.addOutput("MROutput", od, MROutputCommitter.class);
+    summerVertex.addOutput("MROutput", od, ocd);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index e919516..9311754 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -56,12 +56,12 @@ public class MROutputCommitter extends OutputCommitter {
 
   @Override
   public void initialize(OutputCommitterContext context) throws IOException {
-    byte[] userPayload = context.getUserPayload();
+    byte[] userPayload = context.getOutputUserPayload();
     if (userPayload == null) {
       jobConf = new JobConf();
     } else {
       jobConf = new JobConf(
-          MRHelpers.createConfFromUserPayload(context.getUserPayload()));
+          MRHelpers.createConfFromUserPayload(context.getOutputUserPayload()));
     }
 
     // Read all credentials into the credentials instance stored in JobConf.

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 87a98b9..4e1e0b6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -66,7 +66,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
       sw = new Stopwatch().start();
     }
     MRInputUserPayloadProto userPayloadProto = MRHelpers
-        .parseMRInputPayload(rootInputContext.getUserPayload());
+        .parseMRInputPayload(rootInputContext.getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 7d11ab3..a2aa5d8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -59,7 +59,7 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
     if (LOG.isDebugEnabled()) {
       sw = new Stopwatch().start();
     }
-    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 38d0711..29ea6a6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -64,6 +64,8 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
@@ -77,7 +79,6 @@ import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 import com.google.common.base.Function;
@@ -979,7 +980,7 @@ public class MRHelpers {
    * @param initClazz class to init the input in the AM
    */
   public static void addMRInput(Vertex vertex, byte[] userPayload,
-      Class<? extends TezRootInputInitializer> initClazz) {
+      InputInitializerDescriptor initClazz) {
     InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
         .setUserPayload(userPayload);
     vertex.addInput("MRInput", id, initClazz);
@@ -997,14 +998,14 @@ public class MRHelpers {
   public static void addMROutput(Vertex vertex, byte[] userPayload) {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
         .setUserPayload(userPayload);
-    vertex.addOutput("MROutput", od, MROutputCommitter.class);
+    vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
   }
 
   @Private
   public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
     OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addOutput("MROutput", od, MROutputCommitter.class);
+    vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 4e3e5e7..eb2ba89 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -94,8 +94,10 @@ public class MRInput extends MRInputBase {
    *          the InputFormat will be grouped in the AM based on available
    *          resources, locality etc. This option may be set to true only when
    *          using MRInputAMSplitGenerator as the initializer class in
-   *          {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, Class)}
-   * @return returns the user payload to be set on the InputDescriptor of  MRInput
+   *          {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, 
+   *          org.apache.tez.dag.api.InputInitializerDescriptor)}
+   * @return returns the user payload to be set on the InputDescriptor of
+   *         MRInput
    * @throws IOException
    */
   public static byte[] createUserPayload(Configuration conf,

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index 5d6ec0d..84c945e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -168,7 +168,7 @@ public class TestMRInputSplitDistributor {
     }
 
     @Override
-    public byte[] getUserPayload() {
+    public byte[] getInputUserPayload() {
       return payload;
     }
 
@@ -202,6 +202,11 @@ public class TestMRInputSplitDistributor {
       throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
     }
 
+    @Override
+    public byte[] getUserPayload() {
+      throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
+    }
+
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index ea0378d..67fc6a5 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -472,7 +473,9 @@ public class TestMRRJobsDAGApi {
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(stage1Payload),
         stage1NumTasks, Resource.newInstance(256, 1));
-    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
+    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, 
+        (inputInitializerClazz==null) ? null : 
+          new InputInitializerDescriptor(inputInitializerClazz.getName()));
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(stage2Payload),
         1, Resource.newInstance(256, 1));
@@ -666,7 +669,7 @@ public class TestMRRJobsDAGApi {
   public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator {
     public List<Event> initialize(TezRootInputInitializerContext rootInputContext)  throws Exception {
       MRInputUserPayloadProto userPayloadProto = MRHelpers
-          .parseMRInputPayload(rootInputContext.getUserPayload());
+          .parseMRInputPayload(rootInputContext.getInputUserPayload());
       Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
           .getConfigurationBytes());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 87e6472..b0f937a 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -29,6 +29,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -174,7 +175,7 @@ public class TestDAGRecovery {
     DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
     dag.getVertex("v1").addInput("i1",
         new InputDescriptor(NoOpInput.class.getName()),
-        FailingInputInitializer.class);
+        new InputInitializerDescriptor(FailingInputInitializer.class.getName()));
     runDAGAndVerify(dag, State.SUCCEEDED);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 06e4a87..6b3727a 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -177,8 +178,9 @@ public class TestDAGRecovery2 {
     od.setUserPayload(new
         MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
             .toUserPayload());
-    dag.getVertex("v3").addOutput("FailingOutput", od,
-        MultiAttemptDAG.FailingOutputCommitter.class);
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
+        MultiAttemptDAG.FailingOutputCommitter.class.getName());
+    dag.getVertex("v3").addOutput("FailingOutput", od, ocd);
     runDAGAndVerify(dag, State.FAILED);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 51d3d9e..1dc8879 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -154,7 +154,7 @@ public class MultiAttemptDAG {
     public void initialize(OutputCommitterContext context) throws Exception {
       FailingOutputCommitterConfig config = new
           FailingOutputCommitterConfig();
-      config.fromUserPayload(context.getUserPayload());
+      config.fromUserPayload(context.getOutputUserPayload());
       failOnCommit = config.failOnCommit;
     }
 


Mime
View raw message