tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] git commit: TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of the input and output (bikas)
Date Thu, 24 Jul 2014 22:35:12 GMT
TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of the input and output (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/74d04a48
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/74d04a48
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/74d04a48

Branch: refs/heads/master
Commit: 74d04a48ac8572631b6b73fc27350526397c00e7
Parents: 2e52635
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jul 24 15:34:57 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jul 24 15:34:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/tez/dag/api/DAG.java   |  16 +--
 .../apache/tez/dag/api/DagTypeConverters.java   |  35 ++++--
 .../tez/dag/api/InputInitializerDescriptor.java |  33 ++++++
 .../tez/dag/api/OutputCommitterDescriptor.java  |  33 ++++++
 .../apache/tez/dag/api/RootInputLeafOutput.java |  20 ++--
 .../apache/tez/dag/api/TezEntityDescriptor.java |   1 +
 .../java/org/apache/tez/dag/api/Vertex.java     |  68 +++++-------
 .../org/apache/tez/dag/api/VertexGroup.java     |  20 ++--
 .../tez/runtime/api/OutputCommitterContext.java |  10 +-
 .../api/TezRootInputInitializerContext.java     |  12 ++-
 tez-api/src/main/proto/DAGApiRecords.proto      |   4 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |   8 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +-
 .../app/dag/RootInputInitializerManager.java    |  41 ++++---
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  10 +-
 .../dag/impl/OutputCommitterContextImpl.java    |  27 ++---
 .../dag/impl/RootInputLeafOutputDescriptor.java |  46 --------
 .../TezRootInputInitializerContextImpl.java     |  19 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  91 +++++++++-------
 .../tez/dag/app/dag/impl/VertexManager.java     |  12 +--
 .../history/events/VertexInitializedEvent.java  |  42 ++++----
 .../apache/tez/dag/history/utils/DAGUtils.java  |  20 ++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  45 +++++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 108 +++++++++++--------
 .../tez/dag/history/utils/TestDAGUtils.java     |   8 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   7 +-
 .../examples/FilterLinesByWordOneToOne.java     |   6 +-
 .../mapreduce/examples/IntersectDataGen.java    |   7 +-
 .../mapreduce/examples/IntersectExample.java    |  11 +-
 .../mapreduce/examples/IntersectValidate.java   |   5 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   8 +-
 .../mapreduce/examples/OrderedWordCount.java    |   5 +-
 .../tez/mapreduce/examples/UnionExample.java    |  17 +--
 .../tez/mapreduce/examples/WordCount.java       |  10 +-
 .../mapreduce/committer/MROutputCommitter.java  |   4 +-
 .../common/MRInputAMSplitGenerator.java         |   2 +-
 .../common/MRInputSplitDistributor.java         |   2 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |   9 +-
 .../org/apache/tez/mapreduce/input/MRInput.java |   6 +-
 .../common/TestMRInputSplitDistributor.java     |   7 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   7 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |   3 +-
 .../org/apache/tez/test/TestDAGRecovery2.java   |   6 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    |   2 +-
 45 files changed, 505 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a76df8..e7a67db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,8 @@ INCOMPATIBLE CHANGES
   TEZ-866. Add a TezMergedInputContext for MergedInputs (bikas)
   TEZ-1137. Move TezJobConfig to runtime-library and rename to
   TezRuntimeConfiguration (bikas)
+  TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of
+  the input and output (bikas)
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 79c474a..7e35c57 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -366,7 +366,8 @@ public class DAG {
 
     // check input and output names don't collide with vertex names
     for (Vertex vertex : vertices.values()) {
-      for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
+           input : vertex.getInputs()) {
         if (vertexMap.containsKey(input.getName())) {
           throw new IllegalStateException("Vertex: "
               + vertex.getName()
@@ -374,7 +375,8 @@ public class DAG {
               + input.getName());
         }
       }
-      for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
+      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
+            output : vertex.getOutputs()) {
         if (vertexMap.containsKey(output.getName())) {
           throw new IllegalStateException("Vertex: "
               + vertex.getName()
@@ -387,7 +389,8 @@ public class DAG {
     // Check for valid InputNames
     for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
       Vertex vertex = entry.getKey();
-      for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
+           input : vertex.getInputs()) {
         if (entry.getValue().contains(input.getName())) {
           throw new IllegalStateException("Vertex: "
               + vertex.getName()
@@ -400,7 +403,8 @@ public class DAG {
     // Check for valid OutputNames
     for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
       Vertex vertex = entry.getKey();
-      for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
+      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
+            output : vertex.getOutputs()) {
         if (entry.getValue().contains(output.getName())) {
           throw new IllegalStateException("Vertex: "
               + vertex.getName()
@@ -532,12 +536,12 @@ public class DAG {
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
         .convertToDAGPlan(vertex.getProcessorDescriptor()));
       if (vertex.getInputs().size() > 0) {
-        for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
+        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) {
           vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
         }
       }
       if (vertex.getOutputs().size() > 0) {
-        for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
+        for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : vertex.getOutputs()) {
           vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 87592e7..dbec44d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -270,7 +270,7 @@ public class DagTypeConverters {
   }
 
   public static TezEntityDescriptorProto convertToDAGPlan(
-      TezEntityDescriptor descriptor) {
+      TezEntityDescriptor<?> descriptor) {
     TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto
         .newBuilder();
     builder.setClassName(descriptor.getClassName());
@@ -303,13 +303,12 @@ public class DagTypeConverters {
 
 
   public static RootInputLeafOutputProto convertToDAGPlan(
-      RootInputLeafOutput<? extends TezEntityDescriptor> descriptor) {
+      RootInputLeafOutput<? extends TezEntityDescriptor<?>, ? extends TezEntityDescriptor<?>> rootIO) {
     RootInputLeafOutputProto.Builder builder = RootInputLeafOutputProto.newBuilder();
-    builder.setName(descriptor.getName());
-    builder.setEntityDescriptor(convertToDAGPlan(descriptor.getDescriptor()));
-    if (descriptor.getInitializerClass() != null) {
-      builder.setInitializerClassName(descriptor.getInitializerClass()
-          .getName());
+    builder.setName(rootIO.getName());
+    builder.setIODescriptor(convertToDAGPlan(rootIO.getIODescriptor()));
+    if (rootIO.getControllerDescriptor() != null) {
+      builder.setControllerDescriptor(convertToDAGPlan(rootIO.getControllerDescriptor()));
     }
     return builder.build();
   }
@@ -333,7 +332,27 @@ public class DagTypeConverters {
     }
     return new OutputDescriptor(className).setUserPayload(bb);
   }
-  
+
+  public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().toByteArray();
+    }
+    return new InputInitializerDescriptor(className).setUserPayload(bb);
+  }
+
+  public static OutputCommitterDescriptor convertOutputCommitterDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().toByteArray();
+    }
+    return new OutputCommitterDescriptor(className).setUserPayload(bb);
+  }
+
   public static VertexManagerPluginDescriptor convertVertexManagerPluginDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/InputInitializerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputInitializerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputInitializerDescriptor.java
new file mode 100644
index 0000000..50e2227
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputInitializerDescriptor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+public class InputInitializerDescriptor extends TezEntityDescriptor<InputInitializerDescriptor> {
+
+  @Private // for Writable
+  public InputInitializerDescriptor() {
+    super();
+  }
+
+  public InputInitializerDescriptor(String initializerClassName) {
+    super(initializerClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/OutputCommitterDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputCommitterDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputCommitterDescriptor.java
new file mode 100644
index 0000000..843b2b7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputCommitterDescriptor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+public class OutputCommitterDescriptor extends TezEntityDescriptor<OutputCommitterDescriptor> {
+
+  @Private // for Writable
+  public OutputCommitterDescriptor() {
+    super();
+  }
+
+  public OutputCommitterDescriptor(String committerClassName) {
+    super(committerClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java b/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
index 11929b9..9b006e3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
@@ -21,28 +21,28 @@ package org.apache.tez.dag.api;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 
 @Private
-class  RootInputLeafOutput <T extends TezEntityDescriptor> {
+public class RootInputLeafOutput <T extends TezEntityDescriptor<T>, S extends TezEntityDescriptor<S>> {
 
   private final String name;
-  private final T descriptor;
-  private final Class<?> initializerClazz;
+  private final T ioDescriptor;
+  private final S controllerDescriptor;
 
-  RootInputLeafOutput(String name, T descriptor, Class<?> initializerClazz) {
+  public RootInputLeafOutput(String name, T ioDescriptor, S controllerDescriptor) {
     this.name = name;
-    this.descriptor = descriptor;
-    this.initializerClazz = initializerClazz;
+    this.ioDescriptor = ioDescriptor;
+    this.controllerDescriptor = controllerDescriptor;
   }
   
   public String getName() {
     return this.name;
   }
 
-  public T getDescriptor() {
-    return this.descriptor;
+  public T getIODescriptor() {
+    return this.ioDescriptor;
   }
   
-  public Class<?> getInitializerClass() {
-    return this.initializerClazz;
+  public S getControllerDescriptor() {
+    return this.controllerDescriptor;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index f6fb671..f47190a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.common.TezUserPayload;
 
+@SuppressWarnings("unchecked")
 public abstract class TezEntityDescriptor<T extends TezEntityDescriptor<T>> implements Writable {
 
   protected TezUserPayload userPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 00b10ef..4dca9ef 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -24,12 +24,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
-import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
@@ -46,10 +47,10 @@ public class Vertex {
   private final Resource taskResource;
   private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
-  private final List<RootInputLeafOutput<InputDescriptor>> additionalInputs 
-                      = new ArrayList<RootInputLeafOutput<InputDescriptor>>();
-  private final List<RootInputLeafOutput<OutputDescriptor>> additionalOutputs 
-                      = new ArrayList<RootInputLeafOutput<OutputDescriptor>>();
+  private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs 
+                      = new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
+  private final List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs 
+                      = new ArrayList<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>();
   private VertexManagerPluginDescriptor vertexManagerPlugin;
 
   private final List<Vertex> inputVertices = new ArrayList<Vertex>();
@@ -232,7 +233,7 @@ public class Vertex {
    *          in the {@link LogicalIOProcessor}
    * @param inputDescriptor
    *          the inputDescriptor for this input
-   * @param inputInitializer
+   * @param inputInitializerDescriptor
    *          An initializer for this Input which may run within the AM. This
    *          can be used to set the parallelism for this vertex and generate
    *          {@link RootInputDataInformationEvent}s for the actual Input.</p>
@@ -240,13 +241,14 @@ public class Vertex {
    *          vertex. In addition, the Input should know how to access data for
    *          each of it's tasks. </p> If a {@link TezRootInputInitializer} is
    *          meant to determine the parallelism of the vertex, the initial
-   *          vertex parallelism should be set to -1.
+   *          vertex parallelism should be set to -1. Can be null.
    * @return this Vertex
    */
   public Vertex addInput(String inputName, InputDescriptor inputDescriptor,
-      Class<? extends TezRootInputInitializer> inputInitializer) {
-    additionalInputs.add(new RootInputLeafOutput<InputDescriptor>(inputName,
-        inputDescriptor, inputInitializer));
+      @Nullable InputInitializerDescriptor inputInitializerDescriptor) {
+    additionalInputs
+        .add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
+            inputName, inputDescriptor, inputInitializerDescriptor));
     return this;
   }
 
@@ -265,42 +267,30 @@ public class Vertex {
    *          the name of the output. This will be used when accessing the
    *          output in the {@link LogicalIOProcessor}
    * @param outputDescriptor
-   * @param outputCommitterClazz Class to be used for the OutputCommitter.
-   *                             Can be null.
+   * @param outputCommitterDescriptor Specify committer to be used for the output
+   *           Can be null. After all tasks in the vertex (or in the DAG) have 
+   *           completed, the committer (if specified) is invoked to commit the 
+   *           outputs. Commit is a data sink specific operation that usually 
+   *           determines the visibility of the output to external observers.
+   *           E.g. moving output files from temporary dirs to the real output 
+   *           dir. When there are multiple executions of a task, the commit 
+   *           process also helps decide which execution will be included in the 
+   *           final output. Users should consider whether their application or 
+   *           data sink need a commit operation.
    * @return this Vertex
    */
   public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor,
-      Class<? extends OutputCommitter> outputCommitterClazz) {
-    additionalOutputs.add(new RootInputLeafOutput<OutputDescriptor>(outputName,
-        outputDescriptor, outputCommitterClazz));
+      @Nullable OutputCommitterDescriptor outputCommitterDescriptor) {
+    additionalOutputs
+        .add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
+            outputName, outputDescriptor, outputCommitterDescriptor));
     return this;
   }
   
-  Vertex addAdditionalOutput(RootInputLeafOutput<OutputDescriptor> output) {
+  Vertex addAdditionalOutput(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
     additionalOutputs.add(output);
     return this;
   }
-
-  /**
-   * Specifies an Output for a Vertex. This is meant to be used when a Vertex
-   * writes Output directly to an external destination. </p>
-   * 
-   * If an output of the vertex is meant to be consumed by another Vertex in the
-   * DAG - use the {@link DAG addEdge} method.
-   * 
-   * If a vertex needs generate data to an external source as well as for
-   * another Vertex in the DAG, a combination of this API and the DAG.addEdge
-   * API can be used.
-   * 
-   * @param outputName
-   *          the name of the output. This will be used when accessing the
-   *          output in the {@link LogicalIOProcessor}
-   * @param outputDescriptor
-   * @return this Vertex
-   */
-  public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor) {
-    return addOutput(outputName, outputDescriptor, null);
-  }
   
   /**
    * Specifies a {@link VertexManagerPlugin} for the vertex. This plugin can be
@@ -371,11 +361,11 @@ public class Vertex {
     return outputEdges;
   }
   
-  List<RootInputLeafOutput<InputDescriptor>> getInputs() {
+  List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getInputs() {
     return additionalInputs;
   }
 
-  List<RootInputLeafOutput<OutputDescriptor>> getOutputs() {
+  List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getOutputs() {
     return additionalOutputs;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index 3baa778..0952ab3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -19,13 +19,11 @@
 package org.apache.tez.dag.api;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.tez.runtime.api.OutputCommitter;
+import javax.annotation.Nullable;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -67,8 +65,6 @@ public class VertexGroup {
     }
   }
   
-  List<RootInputLeafOutput<OutputDescriptor>> outputs = Lists.newLinkedList();
-  
   GroupInfo groupInfo;
   
   /**
@@ -92,13 +88,13 @@ public class VertexGroup {
   
   /**
    * Add an common output to the group of vertices.
-   * Refer to {@link Vertex#addOutput(String, OutputDescriptor, Class)}
+   * Refer to {@link Vertex#addOutput(String, OutputDescriptor, OutputCommitterDescriptor)}
    */
   public VertexGroup addOutput(String outputName, OutputDescriptor outputDescriptor,
-      Class<? extends OutputCommitter> outputCommitterClazz) {
-    RootInputLeafOutput<OutputDescriptor> leafOutput = new RootInputLeafOutput<OutputDescriptor>(outputName,
-        outputDescriptor, outputCommitterClazz);
-    outputs.add(leafOutput);
+      @Nullable OutputCommitterDescriptor committerDescriptor) {
+    RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> leafOutput = 
+        new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(outputName,
+        outputDescriptor, committerDescriptor);
     this.groupInfo.outputs.add(outputName);
     
     // also add output to its members
@@ -114,10 +110,6 @@ public class VertexGroup {
     return "[ VertexGroup: " + groupInfo.getGroupName() + "]";
   }
 
-  List<RootInputLeafOutput<OutputDescriptor>> getOutputs() {
-    return outputs;
-  }
-  
   GroupInfo getGroupInfo() {
     return groupInfo;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index b5837e8..7189c2e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -63,9 +65,15 @@ public interface OutputCommitterContext {
    * Get the User Payload for the Output
    * @return User Payload
    */
-  public byte[] getUserPayload();
+  @Nullable public byte[] getOutputUserPayload();
 
   /**
+   * Get the User Payload for the OutputCommitter
+   * @return User Payload
+   */
+  @Nullable public byte[] getUserPayload();
+  
+  /**
    * Get Vertex Index in the DAG
    * @return Vertex index
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
index 399579a..9513a80 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 
@@ -42,10 +44,16 @@ public interface TezRootInputInitializerContext {
   String getInputName();
 
   /**
-   * Get the user payload
+   * Get the user payload for the input
+   * @return User payload
+   */
+  @Nullable byte[] getInputUserPayload();
+  
+  /**
+   * Get the user payload for the initializer
    * @return User payload
    */
-  byte[] getUserPayload();
+  @Nullable byte[] getUserPayload();
   
   /**
    * Get the number of tasks in this vertex. Maybe -1 if the vertex has not been

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 9fa9cf2..13b25c0 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -118,8 +118,8 @@ message TezEntityDescriptorProto {
 
 message RootInputLeafOutputProto {
   optional string name = 1;
-  optional TezEntityDescriptorProto entity_descriptor= 2;
-  optional string initializer_class_name = 3;
+  optional TezEntityDescriptorProto i_o_descriptor = 2;
+  optional TezEntityDescriptorProto controller_descriptor = 3;
 }
 
 message VertexPlan {

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 6e65599..cd3414b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -506,7 +506,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addOutput("v2", new OutputDescriptor());
+    v1.addOutput("v2", new OutputDescriptor(), null);
     
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -530,7 +530,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addOutput("v2", new OutputDescriptor());
+    v1.addOutput("v2", new OutputDescriptor(), null);
     
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -701,8 +701,8 @@ public class TestDAGVerify {
     Assert.assertFalse(dag.edges.contains(e2));
     Assert.assertEquals(1, v1.getOutputs().size());
     Assert.assertEquals(1, v2.getOutputs().size());
-    Assert.assertEquals(outDesc, v1.getOutputs().get(0).getDescriptor());
-    Assert.assertEquals(outDesc, v2.getOutputs().get(0).getDescriptor());
+    Assert.assertEquals(outDesc, v1.getOutputs().get(0).getIODescriptor());
+    Assert.assertEquals(outDesc, v2.getOutputs().get(0).getIODescriptor());
     Assert.assertEquals(1, v1.getOutputVertices().size());
     Assert.assertEquals(1, v3.getOutputVertices().size());
     Assert.assertEquals(2, v2.getOutputVertices().size());

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 41b808b..4e55376 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -700,8 +700,8 @@ public class DAGAppMaster extends AbstractService {
             + "[" + sanitizeLabelForViz(input.getName()) + "]");
         inputNode.setShape("box");
         inputNode.addEdge(n, "Input"
-            + " [inputClass=" + getShortClassName(input.getEntityDescriptor().getClassName())
-            + ", initializer=" + getShortClassName(input.getInitializerClassName()) + "]");
+            + " [inputClass=" + getShortClassName(input.getIODescriptor().getClassName())
+            + ", initializer=" + getShortClassName(input.getControllerDescriptor().getClassName()) + "]");
       }
       for (DAGProtos.RootInputLeafOutputProto output : v.getOutputsList()) {
         Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getName())
@@ -710,8 +710,8 @@ public class DAGAppMaster extends AbstractService {
             + "[" + sanitizeLabelForViz(output.getName()) + "]");
         outputNode.setShape("box");
         n.addEdge(outputNode, "Output"
-            + " [outputClass=" + getShortClassName(output.getEntityDescriptor().getClassName())
-            + ", initializer=" + getShortClassName(output.getInitializerClassName()) + "]");
+            + " [outputClass=" + getShortClassName(output.getIODescriptor().getClassName())
+            + ", initializer=" + getShortClassName(output.getControllerDescriptor().getClassName()) + "]");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 3d20881..5c604cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -29,17 +29,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
-import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.Event;
@@ -53,6 +55,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 
 public class RootInputInitializerManager {
@@ -71,7 +74,6 @@ public class RootInputInitializerManager {
 
   private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
 
-  @SuppressWarnings("rawtypes")
   public RootInputInitializerManager(Vertex vertex, AppContext appContext,
                                      UserGroupInformation dagUgi) {
     this.appContext = appContext;
@@ -83,12 +85,12 @@ public class RootInputInitializerManager {
     this.dagUgi = dagUgi;
   }
   
-  public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
-
-    for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
+  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+      inputs) {
+    for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
       TezRootInputInitializer initializer = createInitializer(input);
       InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, vertex, appContext);
-      initializerMap.put(input.getEntityName(), initializerWrapper);
+      initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
       Futures.addCallback(future, createInputInitializerCallback(initializerWrapper));
@@ -97,8 +99,9 @@ public class RootInputInitializerManager {
 
 
   @VisibleForTesting
-  protected TezRootInputInitializer createInitializer(RootInputLeafOutputDescriptor<InputDescriptor> input) {
-    String className = input.getInitializerClassName();
+  protected TezRootInputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
+      input) {
+    String className = input.getControllerDescriptor().getClassName();
     @SuppressWarnings("unchecked")
     Class<? extends TezRootInputInitializer> clazz =
         (Class<? extends TezRootInputInitializer>) ReflectionUtils
@@ -131,7 +134,7 @@ public class RootInputInitializerManager {
     if (initializer.isComplete()) {
       LOG.warn(
           "Event targeted at vertex " + vertex.getLogIdentifier() + ", initializerWrapper for Input: " +
-              initializer.getInput().getEntityName() +
+              initializer.getInput().getName() +
               " will be dropped, since Input has already been initialized. [" + event + "]");
     }
     try {
@@ -173,7 +176,7 @@ public class RootInputInitializerManager {
         @Override
         public List<Event> run() throws Exception {
           LOG.info(
-              "Starting InputInitializer for Input: " + initializerWrapper.getInput().getEntityName() +
+              "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
                   " on vertex " + initializerWrapper.getVertexLogIdentifier());
           return initializerWrapper.getInitializer().initialize(initializerWrapper.context);
         }
@@ -203,10 +206,10 @@ public class RootInputInitializerManager {
     public void onSuccess(List<Event> result) {
       initializer.setComplete();
       LOG.info(
-          "Succeeded InputInitializer for Input: " + initializer.getInput().getEntityName() +
+          "Succeeded InputInitializer for Input: " + initializer.getInput().getName() +
               " on vertex " + initializer.getVertexLogIdentifier());
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
-          initializer.getInput().getEntityName(), result));
+          initializer.getInput().getName(), result));
     }
 
     @SuppressWarnings("unchecked")
@@ -214,23 +217,23 @@ public class RootInputInitializerManager {
     public void onFailure(Throwable t) {
       initializer.setComplete();
       LOG.info(
-          "Failed InputInitializer for Input: " + initializer.getInput().getEntityName() +
+          "Failed InputInitializer for Input: " + initializer.getInput().getName() +
               " on vertex " + initializer.getVertexLogIdentifier());
       eventHandler
-          .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getEntityName(), t));
+          .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(), t));
     }
   }
 
   private static class InitializerWrapper {
 
 
-    private final RootInputLeafOutputDescriptor<InputDescriptor> input;
+    private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
     private final TezRootInputInitializer initializer;
     private final TezRootInputInitializerContext context;
     private final AtomicBoolean isComplete = new AtomicBoolean(false);
     private final String vertexLogIdentifier;
 
-    InitializerWrapper(RootInputLeafOutputDescriptor<InputDescriptor> input,
+    InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
                        TezRootInputInitializer initializer, Vertex vertex,
                        AppContext appContext) {
       this.input = input;
@@ -239,7 +242,7 @@ public class RootInputInitializerManager {
       this.vertexLogIdentifier = vertex.getLogIdentifier();
     }
 
-    public RootInputLeafOutputDescriptor<InputDescriptor> getInput() {
+    public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
       return input;
     }
 
@@ -247,10 +250,6 @@ public class RootInputInitializerManager {
       return initializer;
     }
 
-    public TezRootInputInitializerContext getContext() {
-      return context;
-    }
-
     public String getVertexLogIdentifier() {
       return vertexLogIdentifier;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 2ee3483..5f97d6b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -28,8 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -40,7 +43,6 @@ import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.impl.Edge;
-import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -102,9 +104,11 @@ public interface Vertex extends Comparable<Vertex> {
   void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs);
 
   @Nullable
-  Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs();
+  public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+    getAdditionalInputs();
   @Nullable
-  Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs();
+  public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
+    getAdditionalOutputs();
 
   List<InputSpec> getInputSpecList(int taskIndex);
   List<OutputSpec> getOutputSpecList(int taskIndex);

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index 07e19a7..0dbafed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import javax.annotation.Nullable;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.TezUserPayload;
-import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 
 public class OutputCommitterContextImpl implements OutputCommitterContext {
@@ -31,27 +32,24 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
   private final int dagAttemptNumber;
   private final String dagName;
   private final String vertexName;
-  private final String outputName;
-  private final TezUserPayload userPayload;
   private final int vertexIdx;
+  private final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output;
 
   public OutputCommitterContextImpl(ApplicationId applicationId,
       int dagAttemptNumber,
       String dagName,
       String vertexName,
-      String outputName,
-      @Nullable byte[] userPayload,
+      RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output,
       int vertexIdx) {
     checkNotNull(applicationId, "applicationId is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
-    checkNotNull(outputName, "outputName is null");
+    checkNotNull(output, "output is null");
     this.applicationId = applicationId;
     this.dagAttemptNumber = dagAttemptNumber;
     this.dagName = dagName;
     this.vertexName = vertexName;
-    this.outputName = outputName;
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    this.output = output;
     this.vertexIdx = vertexIdx;
   }
 
@@ -77,12 +75,17 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
 
   @Override
   public String getOutputName() {
-    return outputName;
+    return output.getName();
   }
 
   @Override
+  public byte[] getOutputUserPayload() {
+    return output.getIODescriptor().getUserPayload();
+  }
+  
+  @Override
   public byte[] getUserPayload() {
-    return userPayload.getPayload();
+    return output.getControllerDescriptor().getUserPayload();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
deleted file mode 100644
index 98872a7..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import org.apache.tez.dag.api.TezEntityDescriptor;
-
-public class RootInputLeafOutputDescriptor<T extends TezEntityDescriptor> {
-  private final String entityName;
-  private final T descriptor;
-  private final String initializerClassName;
-
-  public RootInputLeafOutputDescriptor(String entityName, T descriptor,
-      String initializerClassName) {
-    this.entityName = entityName;
-    this.descriptor = descriptor;
-    this.initializerClassName = initializerClassName;
-  }
-
-  public String getEntityName() {
-    return this.entityName;
-  }
-
-  public T getDescriptor() {
-    return this.descriptor;
-  }
-
-  public String getInitializerClassName() {
-    return this.initializerClassName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index ab21dfb..cbc6cf3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -19,9 +19,12 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
@@ -29,14 +32,15 @@ import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 public class TezRootInputInitializerContextImpl implements
     TezRootInputInitializerContext {
 
-  private RootInputLeafOutputDescriptor<InputDescriptor> input;
+  private RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
   private final Vertex vertex;
   private final AppContext appContext;
 
   // TODO Add support for counters - merged with the Vertex counters.
 
-  public TezRootInputInitializerContextImpl(RootInputLeafOutputDescriptor<InputDescriptor> input, Vertex vertex,
-                                            AppContext appContext) {
+  public TezRootInputInitializerContextImpl(
+      RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
+      Vertex vertex, AppContext appContext) {
     checkNotNull(input, "input is null");
     checkNotNull(vertex, "vertex is null");
     checkNotNull(appContext, "appContext is null");
@@ -57,12 +61,17 @@ public class TezRootInputInitializerContextImpl implements
 
   @Override
   public String getInputName() {
-    return this.input.getEntityName();
+    return this.input.getName();
   }
 
   @Override
+  public byte[] getInputUserPayload() {
+    return this.input.getIODescriptor().getUserPayload();
+  }
+  
+  @Override
   public byte[] getUserPayload() {
-    return this.input.getDescriptor().getUserPayload();
+    return this.input.getControllerDescriptor().getUserPayload();
   }
   
   @Override 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3162d6c..3a9f0fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -60,8 +60,11 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -552,8 +555,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<Vertex, Edge> targetVertices;
   Set<Edge> uninitializedEdges = Sets.newHashSet();
 
-  private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> rootInputDescriptors;
-  private Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> additionalOutputs;
+  private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+    rootInputDescriptors;
+  private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
+    additionalOutputs;
   private Map<String, OutputCommitter> outputCommitters;
   private Map<String, RootInputSpecUpdate> rootInputSpecs = new HashMap<String, RootInputSpecUpdate>();
   private static final RootInputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = RootInputSpecUpdate
@@ -1639,12 +1644,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
       LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
-      for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
+      for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
           additionalOutputs.entrySet())  {
         final String outputName = entry.getKey();
-        final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
-        if (od.getInitializerClassName() == null
-            || od.getInitializerClassName().isEmpty()) {
+        final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
+        if (od.getControllerDescriptor() == null
+            || od.getControllerDescriptor().getClassName() == null) {
           LOG.info("Ignoring committer as none specified for output="
               + outputName
               + ", vertexId=" + logIdentifier);
@@ -1652,20 +1657,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         LOG.info("Instantiating committer for output=" + outputName
             + ", vertexId=" + logIdentifier
-            + ", committerClass=" + od.getInitializerClassName());
+            + ", committerClass=" + od.getControllerDescriptor().getClassName());
 
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
             OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance(
-                od.getInitializerClassName());
+                od.getControllerDescriptor().getClassName());
             OutputCommitterContext outputCommitterContext =
                 new OutputCommitterContextImpl(appContext.getApplicationID(),
                     appContext.getApplicationAttemptId().getAttemptId(),
                     appContext.getCurrentDAG().getName(),
                     vertexName,
-                    outputName,
-                    od.getDescriptor().getUserPayload(),
+                    od,
                     vertexId.getId());
 
             LOG.info("Invoking committer init for output=" + outputName
@@ -1783,15 +1787,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (rootInputDescriptors != null) {
         LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
             + rootInputDescriptors);
-        for (RootInputLeafOutputDescriptor<InputDescriptor> input : rootInputDescriptors.values()) {
-          if (input.getInitializerClassName() != null) {
+        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input 
+            : rootInputDescriptors.values()) {
+          if (input.getControllerDescriptor() != null && 
+              input.getControllerDescriptor().getClassName() != null) {
             if (inputsWithInitializers == null) {
               inputsWithInitializers = Sets.newHashSet();
             }
-            inputsWithInitializers.add(input.getEntityName());
+            inputsWithInitializers.add(input.getName());
             LOG.info("Starting root input initializer for input: "
-                + input.getEntityName() + ", with class: ["
-                + input.getInitializerClassName() + "]");
+                + input.getName() + ", with class: ["
+                + input.getControllerDescriptor().getClassName() + "]");
           }
         }
       }
@@ -2597,8 +2603,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
               vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
-              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+              inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
             inputList.add(vertex.rootInputDescriptors.get(inputName));
           }
@@ -2640,8 +2646,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
               vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
-              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+          inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
             inputList.add(vertex.rootInputDescriptors.get(inputName));
           }
@@ -2729,7 +2735,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.vertexManager.onRootVertexInitialized(
             liInitEvent.getInputName(),
             vertex.getAdditionalInputs().get(liInitEvent.getInputName())
-                .getDescriptor(), liInitEvent.getEvents());
+                .getIODescriptor(), liInitEvent.getEvents());
       }
 
       vertex.numInitializedInputs++;
@@ -3476,12 +3482,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     for (RootInputLeafOutputProto input : inputs) {
 
       InputDescriptor id = DagTypeConverters
-          .convertInputDescriptorFromDAGPlan(input.getEntityDescriptor());
-
-      this.rootInputDescriptors.put(input.getName(),
-          new RootInputLeafOutputDescriptor<InputDescriptor>(input.getName(), id,
-              input.hasInitializerClassName() ? input.getInitializerClassName()
-                  : null));
+          .convertInputDescriptorFromDAGPlan(input.getIODescriptor());
+
+      this.rootInputDescriptors
+          .put(
+              input.getName(),
+              new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
+                  input.getName(), id,
+                  input.hasControllerDescriptor() ? DagTypeConverters
+                      .convertInputInitializerDescriptorFromDAGPlan(input
+                          .getControllerDescriptor()) : null));
       this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS);
     }
   }
@@ -3509,13 +3519,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {
       OutputDescriptor od = DagTypeConverters
-          .convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor());
-
-      this.additionalOutputs.put(
-          output.getName(),
-          new RootInputLeafOutputDescriptor<OutputDescriptor>(output.getName(), od,
-              output.hasInitializerClassName() ? output
-                  .getInitializerClassName() : null));
+          .convertOutputDescriptorFromDAGPlan(output.getIODescriptor());
+
+      this.additionalOutputs
+          .put(
+              output.getName(),
+              new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
+                  output.getName(), od,
+                  output.hasControllerDescriptor() ? DagTypeConverters
+                      .convertOutputCommitterDescriptorFromDAGPlan(output
+                          .getControllerDescriptor()) : null));
       OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0);
       additionalOutputSpecs.add(outputSpec);
     }
@@ -3523,13 +3536,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Nullable
   @Override
-  public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+  public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+    getAdditionalInputs() {
     return this.rootInputDescriptors;
   }
 
   @Nullable
   @Override
-  public Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs() {
+  public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
+    getAdditionalOutputs() {
     return this.additionalOutputs;
   }
 
@@ -3618,10 +3633,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
         + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
     if (rootInputDescriptors != null) {
-      for (Entry<String, RootInputLeafOutputDescriptor<InputDescriptor>> rootInputDescriptorEntry : rootInputDescriptors
-          .entrySet()) {
+      for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+           rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
         inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
-            rootInputDescriptorEntry.getValue().getDescriptor(), rootInputSpecs.get(
+            rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
                 rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 3eb9ca1..5ecc06b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -28,8 +28,6 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ReflectionUtils;
@@ -39,6 +37,8 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -72,9 +72,7 @@ public class VertexManager {
   VertexManagerPluginContextImpl pluginContext;
   TezUserPayload payload = null;
   AppContext appContext;
-  
-  private static final Log LOG = LogFactory.getLog(VertexManager.class);
-  
+    
   class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
     // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
     
@@ -121,8 +119,8 @@ public class VertexManager {
     @Override
     public Set<String> getVertexInputNames() {
       Set<String> inputNames = null;
-      Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> inputs = 
-          managedVertex.getAdditionalInputs();
+      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+          inputs = managedVertex.getAdditionalInputs();
       if (inputs != null) {
         inputNames = inputs.keySet();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 46afa6b..01e0d3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -24,12 +24,11 @@ import java.io.OutputStream;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
-import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezVertexID;
@@ -38,15 +37,13 @@ import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto
 
 public class VertexInitializedEvent implements HistoryEvent {
 
-  private static final Log LOG = LogFactory.getLog(VertexInitializedEvent.class);
-
   private TezVertexID vertexID;
   private String vertexName;
   private long initRequestedTime;
   private long initedTime;
   private int numTasks;
   private String processorName;
-  private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
+  private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs;
 
   public VertexInitializedEvent() {
   }
@@ -54,7 +51,7 @@ public class VertexInitializedEvent implements HistoryEvent {
   public VertexInitializedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime,
       int numTasks, String processorName,
-      Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs) {
+      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -83,16 +80,17 @@ public class VertexInitializedEvent implements HistoryEvent {
     VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
     if (additionalInputs != null
       && !additionalInputs.isEmpty()) {
-      for (RootInputLeafOutputDescriptor<InputDescriptor> input :
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input :
         additionalInputs.values()) {
         RootInputLeafOutputProto.Builder inputBuilder
             = RootInputLeafOutputProto.newBuilder();
-        inputBuilder.setName(input.getEntityName());
-        if (input.getInitializerClassName() != null) {
-          inputBuilder.setInitializerClassName(input.getInitializerClassName());
+        inputBuilder.setName(input.getName());
+        if (input.getControllerDescriptor() != null) {
+          inputBuilder.setControllerDescriptor(DagTypeConverters
+              .convertToDAGPlan(input.getControllerDescriptor()));
         }
-        inputBuilder.setEntityDescriptor(
-            DagTypeConverters.convertToDAGPlan(input.getDescriptor()));
+        inputBuilder.setIODescriptor(
+            DagTypeConverters.convertToDAGPlan(input.getIODescriptor()));
         builder.addInputs(inputBuilder.build());
       }
     }
@@ -112,16 +110,17 @@ public class VertexInitializedEvent implements HistoryEvent {
     this.numTasks = proto.getNumTasks();
     if (proto.getInputsCount() > 0) {
       this.additionalInputs =
-          new LinkedHashMap<String, RootInputLeafOutputDescriptor<InputDescriptor>>();
+          new LinkedHashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
       for (RootInputLeafOutputProto inputProto : proto.getInputsList()) {
-        RootInputLeafOutputDescriptor<InputDescriptor> input =
-            new RootInputLeafOutputDescriptor<InputDescriptor>(
+        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input =
+            new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
                 inputProto.getName(),
                 DagTypeConverters.convertInputDescriptorFromDAGPlan(
-                    inputProto.getEntityDescriptor()),
-                inputProto.hasInitializerClassName() ?
-                    inputProto.getInitializerClassName() : null);
-        additionalInputs.put(input.getEntityName(), input);
+                    inputProto.getIODescriptor()),
+                inputProto.hasControllerDescriptor() ? DagTypeConverters
+                    .convertInputInitializerDescriptorFromDAGPlan(inputProto
+                        .getControllerDescriptor()) : null);
+        additionalInputs.put(input.getName(), input);
       }
     }
   }
@@ -169,7 +168,8 @@ public class VertexInitializedEvent implements HistoryEvent {
     return numTasks;
   }
 
-  public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+  public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+    getAdditionalInputs() {
     return additionalInputs;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index b58ee9b..232a3b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -156,14 +156,14 @@ public class DAGUtils {
           vertexPlan.getInputsList()) {
         Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
         inputMap.put(NAME_KEY, input.getName());
-        inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
-        if (input.hasInitializerClassName()) {
-          inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
+        inputMap.put(CLASS_KEY, input.getIODescriptor().getClassName());
+        if (input.hasControllerDescriptor()) {
+          inputMap.put(INITIALIZER_KEY, input.getControllerDescriptor().getClassName());
         }
-        if (input.getEntityDescriptor().hasHistoryText()) {
+        if (input.getIODescriptor().hasHistoryText()) {
           inputMap.put(USER_PAYLOAD_AS_TEXT,
               DagTypeConverters.getHistoryTextFromProto(
-                  input.getEntityDescriptor()));
+                  input.getIODescriptor()));
         }
         inputsList.add(inputMap);
       }
@@ -174,14 +174,14 @@ public class DAGUtils {
           vertexPlan.getOutputsList()) {
         Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
         outputMap.put(NAME_KEY, output.getName());
-        outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
-        if (output.hasInitializerClassName()) {
-          outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
+        outputMap.put(CLASS_KEY, output.getIODescriptor().getClassName());
+        if (output.hasControllerDescriptor()) {
+          outputMap.put(INITIALIZER_KEY, output.getControllerDescriptor().getClassName());
         }
-        if (output.getEntityDescriptor().hasHistoryText()) {
+        if (output.getIODescriptor().hasHistoryText()) {
           outputMap.put(USER_PAYLOAD_AS_TEXT,
               DagTypeConverters.getHistoryTextFromProto(
-                  output.getEntityDescriptor()));
+                  output.getIODescriptor()));
         }
         outputsList.add(outputMap);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index e979eef..0bc42c1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -166,7 +167,6 @@ public class TestDAGImpl {
   }
 
   private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
-    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskAttemptEvent event) {
       // Ignore
@@ -221,12 +221,14 @@ public class TestDAGImpl {
                     )
                 .addOutputs(
                     DAGProtos.RootInputLeafOutputProto.newBuilder()
-                    .setEntityDescriptor(
+                    .setIODescriptor(
                         TezEntityDescriptorProto.newBuilder().setClassName("output1").build()
                     )
                     .setName("output1")
-                    .setInitializerClassName(CountingOutputCommitter.class.getName())
-                 )
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                CountingOutputCommitter.class.getName()))
+                    )
                     .addOutEdgeId("e1")
                     .build()
             )
@@ -251,11 +253,13 @@ public class TestDAGImpl {
                     )
                 .addOutputs(
                     DAGProtos.RootInputLeafOutputProto.newBuilder()
-                    .setEntityDescriptor(
+                    .setIODescriptor(
                         TezEntityDescriptorProto.newBuilder().setClassName("output2").build()
                     )
                     .setName("output2")
-                    .setInitializerClassName(CountingOutputCommitter.class.getName())
+                    .setControllerDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            CountingOutputCommitter.class.getName()))
                  )
                    .addInEdgeId("e1")
                     .addOutEdgeId("e2")
@@ -283,11 +287,13 @@ public class TestDAGImpl {
                     )
                .addOutputs(
                     DAGProtos.RootInputLeafOutputProto.newBuilder()
-                    .setEntityDescriptor(
+                    .setIODescriptor(
                         TezEntityDescriptorProto.newBuilder().setClassName("output3").build()
                     )
                     .setName("output3")
-                    .setInitializerClassName(CountingOutputCommitter.class.getName())
+                    .setControllerDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            CountingOutputCommitter.class.getName()))
                )
                .addInEdgeId("e2")
                .build()
@@ -350,10 +356,12 @@ public class TestDAGImpl {
     
     DAG dag = new DAG("testDag");
     String groupName1 = "uv12";
+    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
+        TotalCountingOutputCommitter.class.getName());
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor("output.class");
-    uv12.addOutput("uvOut", outDesc, TotalCountingOutputCommitter.class);
-    v3.addOutput("uvOut", outDesc, TotalCountingOutputCommitter.class);
+    uv12.addOutput("uvOut", outDesc, ocd);
+    v3.addOutput("uvOut", outDesc, ocd);
     
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -807,13 +815,18 @@ public class TestDAGImpl {
     List<RootInputLeafOutputProto> outputs =
         new ArrayList<RootInputLeafOutputProto>();
     outputs.add(RootInputLeafOutputProto.newBuilder()
-        .setInitializerClassName(CountingOutputCommitter.class.getName())
+        .setControllerDescriptor(
+            TezEntityDescriptorProto
+                .newBuilder()
+                .setClassName(CountingOutputCommitter.class.getName())
+                .setUserPayload(
+                    ByteString
+                        .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                            true, false, false).toUserPayload())).build())
         .setName("output3")
-        .setEntityDescriptor(
-            TezEntityDescriptorProto.newBuilder()
-                .setUserPayload(ByteString.copyFrom(
-                    new CountingOutputCommitter.CountingOutputCommitterConfig(
-                        true, false, false).toUserPayload())).build())
+        .setIODescriptor(
+            TezEntityDescriptorProto.newBuilder().setClassName("output.class")
+                )
         .build());
     badVertex.setAdditionalOutputs(outputs);
     


Mime
View raw message