tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager to require constructors for creation, and remove the initialize methods. (sseth)
Date Wed, 30 Jul 2014 23:27:05 GMT
TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter,
VertexManagerPlugin, EdgeManager to require constructors for creation,
and remove the initialize methods. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2213c109
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2213c109
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2213c109

Branch: refs/heads/master
Commit: 2213c109edce1380dfab9fe103d7441bb5e1e399
Parents: 770e305
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jul 30 16:26:43 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jul 30 16:26:43 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/tez/dag/api/EdgeManager.java     | 36 +++++++--
 .../apache/tez/dag/api/VertexManagerPlugin.java | 29 ++++++-
 .../tez/runtime/api/AbstractLogicalInput.java   | 52 +++++++++----
 .../tez/runtime/api/AbstractLogicalOutput.java  | 43 +++++++----
 .../runtime/api/InputFrameworkInterface.java    |  9 +--
 .../api/LogicalInputFrameworkInterface.java     |  9 ---
 .../api/LogicalOutputFrameworkInterface.java    |  9 ---
 .../tez/runtime/api/MergedLogicalInput.java     | 27 +++++--
 .../apache/tez/runtime/api/OutputCommitter.java | 28 ++++++-
 .../runtime/api/OutputFrameworkInterface.java   | 10 +--
 .../runtime/api/TezRootInputInitializer.java    | 28 ++++++-
 .../app/dag/RootInputInitializerManager.java    | 34 ++++----
 .../dag/app/dag/impl/BroadcastEdgeManager.java  | 16 ++--
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 29 +++----
 .../dag/impl/ImmediateStartVertexManager.java   | 16 ++--
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  8 +-
 .../app/dag/impl/RootInputVertexManager.java    | 26 ++++---
 .../app/dag/impl/ScatterGatherEdgeManager.java  | 16 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 21 +++--
 .../tez/dag/app/dag/impl/VertexManager.java     | 17 +---
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  5 +-
 .../dag/impl/TestRootInputVertexManager.java    |  8 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 65 +++++++++-------
 .../org/apache/tez/test/EdgeManagerForTest.java | 17 ++--
 .../tez/test/VertexManagerPluginForTest.java    |  6 +-
 .../mapreduce/committer/MROutputCommitter.java  | 24 +++---
 .../common/MRInputAMSplitGenerator.java         | 18 ++---
 .../common/MRInputSplitDistributor.java         | 12 +--
 .../org/apache/tez/mapreduce/input/MRInput.java |  7 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |  5 ++
 .../tez/mapreduce/input/MultiMRInput.java       |  6 +-
 .../tez/mapreduce/input/base/MRInputBase.java   |  5 ++
 .../apache/tez/mapreduce/output/MROutput.java   |  7 +-
 .../tez/mapreduce/output/MROutputLegacy.java    |  5 ++
 .../common/TestMRInputSplitDistributor.java     |  8 +-
 .../tez/mapreduce/input/TestMultiMRInput.java   | 15 ++--
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 81 ++++++++++++--------
 .../runtime/api/impl/TezInputContextImpl.java   | 29 +++----
 .../api/impl/TezMergedInputContextImpl.java     | 20 +++--
 .../tez/runtime/TestInputReadyTracker.java      | 32 ++++++--
 .../TestLogicalIOProcessorRuntimeTask.java      |  9 ++-
 .../vertexmanager/InputReadyVertexManager.java  | 26 ++++---
 .../vertexmanager/ShuffleVertexManager.java     | 44 +++++------
 .../input/ConcatenatedMergedKeyValueInput.java  |  7 ++
 .../input/ConcatenatedMergedKeyValuesInput.java |  7 ++
 .../runtime/library/input/LocalMergedInput.java |  5 ++
 .../library/input/ShuffledMergedInput.java      |  5 ++
 .../input/ShuffledMergedInputLegacy.java        |  5 ++
 .../library/input/ShuffledUnorderedKVInput.java |  4 +-
 .../library/input/SortedGroupedMergedInput.java |  5 ++
 .../library/output/LocalOnFileSorterOutput.java |  4 +
 .../library/output/OnFileSortedOutput.java      |  6 +-
 .../library/output/OnFileUnorderedKVOutput.java | 12 +--
 .../OnFileUnorderedPartitionedKVOutput.java     | 31 +++-----
 .../TestInputReadyVertexManager.java            | 20 ++---
 .../vertexmanager/TestShuffleVertexManager.java | 18 +++--
 .../input/TestSortedGroupedMergedInput.java     | 43 +++++------
 .../library/output/TestOnFileSortedOutput.java  | 40 +++++-----
 .../output/TestOnFileUnorderedKVOutput.java     | 12 ++-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 13 +++-
 .../java/org/apache/tez/test/TestInput.java     | 41 +++++-----
 .../java/org/apache/tez/test/TestOutput.java    |  7 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 60 ++++++++++-----
 64 files changed, 763 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b93e3f..203382e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,8 @@ INCOMPATIBLE CHANGES
   includes Hadoop libs.
   TEZ-1278. TezClient#waitTillReady() should not swallow interrupts
   TEZ-1058. Replace user land interfaces with abstract classes
+  TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager
+  to require constructors for creation, and remove the initialize methods.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
index 92188e5..c447ca5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
@@ -34,7 +34,22 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
  */
 @InterfaceStability.Unstable
 public abstract class EdgeManager {
-  
+
+  private final EdgeManagerContext context;
+
+  /**
+   * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a
+   * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of
+   * the class at runtime.
+   *
+   * @param context the context within which this EdgeManager will run. Includes
+   *                  information like configuration which the user may have specified
+   *                  while setting up the edge.
+   */
+  public EdgeManager(EdgeManagerContext context) {
+    this.context = context;
+  }
+
   /**
    * Initializes the EdgeManager. This method is called in the following
    * circumstances </p> 1. when initializing an Edge Manager for the first time.
@@ -42,13 +57,9 @@ public abstract class EdgeManager {
    * EdgeManager instance is created and setup by the user. The initialize
    * method will be called with the original {@link EdgeManagerContext} when the
    * edgeManager is replaced.
-   * 
-   * @param edgeManagerContext
-   *          the context within which this EdgeManager will run. Includes
-   *          information like configuration which the user may have specified
-   *          while setting up the edge.
+   *
    */
-  public abstract void initialize(EdgeManagerContext edgeManagerContext);
+  public abstract void initialize();
   
   /**
    * Get the number of physical inputs on the destination task
@@ -124,5 +135,14 @@ public abstract class EdgeManager {
    */
   public abstract int routeInputErrorEventToSource(InputReadErrorEvent event,
       int destinationTaskIndex, int destinationFailedInputIndex);
-  
+
+  /**
+   * Return ahe {@link org.apache.tez.dag.api.EdgeManagerContext} for this specific instance of
+   * the vertex manager.
+   *
+   * @return the {@link org.apache.tez.dag.api.EdgeManagerContext} for the input
+   */
+  public EdgeManagerContext getContext() {
+    return this.context;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index b628e74..6aec78e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -33,12 +33,27 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
  */
 @InterfaceStability.Unstable
 public abstract class VertexManagerPlugin {
+
+  private final VertexManagerPluginContext context;
+
+  /**
+   * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a
+   * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of
+   * the class at runtime.
+   *
+   * @param context vertex manager plugin context which can be used to access the payload,
+   *                        vertex
+   *                        properties, etc
+   */
+  public VertexManagerPlugin(VertexManagerPluginContext context) {
+    this.context = context;
+  }
+
   /**
    * Initialize the plugin. Called when the vertex is initializing. This happens 
    * after all source vertices and inputs have initialized
-   * @param context
    */
-  public abstract void initialize(VertexManagerPluginContext context);
+  public abstract void initialize();
 
   /**
    * Notification that the vertex is ready to start running tasks
@@ -67,4 +82,14 @@ public abstract class VertexManagerPlugin {
    */
   public abstract void onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events);
+
+  /**
+   * Return ahe {@link org.apache.tez.dag.api.VertexManagerPluginContext} for this specific instance of
+   * the vertex manager.
+   *
+   * @return the {@link org.apache.tez.dag.api.VertexManagerPluginContext} for the input
+   */
+  public final VertexManagerPluginContext getContext() {
+    return this.context;
+  }
  }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index 7afd61d..8b6edda 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -26,31 +26,55 @@ import java.util.List;
  * implemented by all LogicalInputs.
  *
  * This includes default implementations of a new method for convenience.
- * 
+ *
+ * <code>Input</code> classes must provide a 2 argument public constructor for Tez to create the
+ * Input. The parameters to this constructor are 1) an instance of
+ * {@link org.apache.tez.runtime.api.TezInputContext} and 2) an integer which is used to
+ * setup the number of physical inputs that the logical input will see.
+ * Tez will take care of initializing and closing the Input after a {@link Processor} completes. </p>
+ * <p/>
+ *
  */
 public abstract class AbstractLogicalInput implements LogicalInput, LogicalInputFrameworkInterface {
 
-  protected int numPhysicalInputs;
-  protected TezInputContext inputContext;
+  private final int numPhysicalInputs;
+  private final TezInputContext inputContext;
 
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    this.numPhysicalInputs = numInputs;
+  /**
+   * Constructor an instance of the LogicalInput. Classes extending this one to create a
+   * LogicalInput, must provide the same constructor so that Tez can create an instance of the
+   * class at runtime.
+   *
+   * @param inputContext      the {@link org.apache.tez.runtime.api.TezInputContext} which provides
+   *                          the Input with context information within the running task.
+   * @param numPhysicalInputs the number of physical inputs that the logical input will
+   *                          receive. This is typically determined by Edge Routing.
+   */
+  public AbstractLogicalInput(TezInputContext inputContext, int numPhysicalInputs) {
+    this.inputContext = inputContext;
+    this.numPhysicalInputs = numPhysicalInputs;
   }
 
   @Override
-  public List<Event> initialize(TezInputContext _inputContext) throws Exception {
-    this.inputContext = _inputContext;
-    return initialize();
-  }
-
   public abstract List<Event> initialize() throws Exception;
 
-  public int getNumPhysicalInputs() {
+  /**
+   * Get the number of physical inputs that this LogicalInput will receive. This is
+   * typically determined by Edge routing, and number of upstream tasks
+   *
+   * @return the number of physical inputs
+   */
+  public final int getNumPhysicalInputs() {
     return numPhysicalInputs;
   }
 
-  public TezInputContext getContext() {
+  /**
+   * Return ahe {@link org.apache.tez.runtime.api.TezInputContext} for this specific instance of
+   * the LogicalInput
+   *
+   * @return the {@link org.apache.tez.runtime.api.TezInputContext} for the input
+   */
+  public final TezInputContext getContext() {
     return inputContext;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
index c9ae11d..d88e57f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
@@ -30,27 +30,44 @@ import java.util.List;
  */
 public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOutputFrameworkInterface {
 
-  protected int numPhysicalOutputs;
-  protected TezOutputContext outputContext;
+  private final int numPhysicalOutputs;
+  private final TezOutputContext outputContext;
 
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numPhysicalOutputs = numOutputs;
+  /**
+   * Constructor an instance of the LogicalOutput. Classes extending this one to create a
+   * LogicalOutput, must provide the same constructor so that Tez can create an instance of the
+   * class at runtime.
+   *
+   * @param outputContext      the {@link org.apache.tez.runtime.api.TezOutputContext} which
+   *                           provides
+   *                           the Output with context information within the running task.
+   * @param numPhysicalOutputs the number of physical outputs that the logical output will
+   *                           generate. This is typically determined by Edge Routing.
+   */
+  public AbstractLogicalOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    this.outputContext = outputContext;
+    this.numPhysicalOutputs = numPhysicalOutputs;
   }
 
-  @Override
-  public List<Event> initialize(TezOutputContext _outputContext) throws Exception {
-    this.outputContext = _outputContext;
-    return initialize();
-  }
-  
   public abstract List<Event> initialize() throws Exception;
 
-  public int getNumPhysicalOutputs() {
+  /**
+   * Get the number of physical outputs that this LogicalOutput is expected to generate. This is
+   * typically determined by Edge routing, and number of downstream tasks
+   *
+   * @return the number of physical outputs
+   */
+  public final int getNumPhysicalOutputs() {
     return numPhysicalOutputs;
   }
 
-  public TezOutputContext getContext() {
+  /**
+   * Return ahe {@link org.apache.tez.runtime.api.TezOutputContext} for this specific instance of
+   * the LogicalOutput
+   *
+   * @return the {@link org.apache.tez.runtime.api.TezOutputContext} for the output
+   */
+  public final TezOutputContext getContext() {
     return outputContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
index bf5d373..5192252 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
@@ -34,10 +34,6 @@ import java.util.List;
  * {@link TezInputContext}.requestInitialMemory
  * <p/>
  *
- * <code>Input</code> classes must have a 0 argument public constructor for Tez
- * to construct the <code>Input</code>. Tez will take care of initializing and
- * closing the Input after a {@link Processor} completes. </p>
- * <p/>
  *
  * Inputs must also inform the framework once they are ready to be consumed.
  * This typically means that the Processor will not block when reading from the
@@ -48,14 +44,11 @@ public interface InputFrameworkInterface {
   /**
    * Initializes the <code>Input</code>.
    *
-   * @param inputContext
-   *          the {@link TezInputContext}
    * @return list of events that were generated during initialization
    * @throws Exception
    *           if an error occurs
    */
-  public List<Event> initialize(TezInputContext inputContext)
-      throws Exception;
+  public List<Event> initialize() throws Exception;
 
   /**
    * Handles user and system generated {@link Event}s, which typically carry

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
index 96aa256..33a0f40 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
@@ -21,13 +21,4 @@
 package org.apache.tez.runtime.api;
 
 public interface LogicalInputFrameworkInterface extends InputFrameworkInterface {
-  /**
-   * Sets the number of physical inputs that this <code>LogicalInput</code> will
-   * receive. This will be called by the Tez framework before initializing the
-   * <code>LogicalInput</code>
-   *
-   * @param numInputs
-   *          the number of physical inputs.
-   */
-  public void setNumPhysicalInputs(int numInputs);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
index bc110c8..b5855de 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
@@ -21,13 +21,4 @@
 package org.apache.tez.runtime.api;
 
 public interface LogicalOutputFrameworkInterface extends OutputFrameworkInterface {
-  /**
-   * Sets the number of physical outputs that this <code>LogicalOutput</code>
-   * will receive. This will be called by the Tez framework before initializing
-   * the <code>LogicalOutput</code>
-   *
-   * @param numOutputs
-   *          the number of physical outputs
-   */
-  public void setNumPhysicalOutputs(int numOutputs);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 4503418..6f7b14c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -23,23 +23,40 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * A LogicalInput that is used to merge the data from multiple inputs and provide a 
+ * A LogicalInput that is used to merge the data from multiple inputs and provide a
  * single <code>Reader</code> to read that data.
- * This Input is not initialized or closed. It is only expected to provide a 
+ * This Input is not initialized or closed. It is only expected to provide a
  * merged view of the real inputs. It cannot send or receive events
+ * <p/>
+ * <code>MergedLogicalInput</code> implementations must provide a 2 argument public constructor for
+ * Tez to create the Input. The parameters to this constructor are 1) an instance of {@link
+ * org.apache.tez.runtime.api.TezMergedInputContext} and 2) a list of constituent inputs. Tez will
+ * take care of initializing and closing the Input after a {@link Processor} completes. </p>
+ * <p/>
  */
 public abstract class MergedLogicalInput implements LogicalInput {
 
+
   private AtomicBoolean notifiedInputReady = new AtomicBoolean(false);
   private List<Input> inputs;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
-  private TezMergedInputContext context;
+  private final TezMergedInputContext context;
 
-  public final void initialize(List<Input> inputs, TezMergedInputContext context) {
+  /**
+   * Constructor an instance of the MergedLogicalInputs. Classes extending this one to create a
+   * MergedLogicalInput, must provide the same constructor so that Tez can create an instance of
+   * the
+   * class at runtime.
+   *
+   * @param context the {@link org.apache.tez.runtime.api.TezMergedInputContext} which provides
+   *                the Input with context information within the running task.
+   * @param inputs  the list of constituen Inputs.
+   */
+  public MergedLogicalInput(TezMergedInputContext context, List<Input> inputs) {
     this.inputs = Collections.unmodifiableList(inputs);
     this.context = context;
   }
-  
+
   public final List<Input> getInputs() {
     return inputs;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 445a69a..7e21345 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -28,14 +28,26 @@ import org.apache.tez.dag.api.client.VertexStatus;
 @InterfaceStability.Unstable
 public abstract class OutputCommitter {
 
+  private final OutputCommitterContext outputCommitterContext;
+
+  /**
+   * Constructor an instance of the OutputCommitter. Classes extending this one to create an
+   * OutputCommitter, must provide the same constructor so that Tez can create an instance of
+   * the class at runtime.
+   *
+   * @param committerContext committer context which can be used to access the payload, vertex
+   *                         properties, etc
+   */
+  public OutputCommitter(OutputCommitterContext committerContext) {
+    this.outputCommitterContext = committerContext;
+  }
+
   /**
    * Setup up the Output committer.
    *
-   * @param context Context of the output that is being acted upon
    * @throws java.lang.Exception
    */
-  public abstract void initialize(OutputCommitterContext context)
-      throws Exception;
+  public abstract void initialize() throws Exception;
 
   /**
    * For the framework to setup the output during initialization. This is
@@ -89,4 +101,14 @@ public abstract class OutputCommitter {
   public void recoverTask(int taskIndex, int previousDAGAttempt)  throws Exception {
   }
 
+  /**
+   * Return ahe {@link org.apache.tez.runtime.api.OutputCommitterContext} for this specific instance of
+   * the Committer.
+   *
+   * @return the {@link org.apache.tez.runtime.api.OutputCommitterContext} for the input
+   */
+  public final OutputCommitterContext getContext() {
+    return this.outputCommitterContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
index ed37f6d..873fa0c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
@@ -33,25 +33,17 @@ import java.util.List;
  * {@link TezOutputContext}.requestInitialMemory
  * <p/>
  *
- * <code>Output</code> classes must have a 0 argument public constructor for Tez
- * to construct the <code>Output</code>. Tez will take care of initializing and
- * closing the Output after a {@link Processor} completes. </p>
- * <p/>
- *
  */
 public interface OutputFrameworkInterface {
 
   /**
    * Initializes the <code>Output</code>
    *
-   * @param outputContext
-   *          the {@link TezOutputContext}
    * @return list of events that were generated during initialization
    * @throws Exception
    *           if an error occurs
    */
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws Exception;
+  public List<Event> initialize() throws Exception;
 
   /**
    * Handles user and system generated {@link Event}s, which typically carry

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
index d73531a..cfd192e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
@@ -32,19 +32,31 @@ import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 @InterfaceStability.Unstable
 public abstract class TezRootInputInitializer {
 
+  private final TezRootInputInitializerContext initializerContext;
+
+  /**
+   * Constructor an instance of the RootInputInitializer. Classes extending this one to create a
+   * RootInputInitializer, must provide the same constructor so that Tez can create an instance of
+   * the class at runtime.
+   *
+   * @param initializerContext initializer context which can be used to access the payload, vertex
+   *                           properties, etc
+   */
+  public TezRootInputInitializer(TezRootInputInitializerContext initializerContext) {
+    this.initializerContext = initializerContext;
+  }
+
   /**
    * Run the root input initializer. This is the main method where initialization takes place. If an
    * Initializer is written to accept events, a notification mechanism should be setup, with the
    * heavy lifting of processing the event being done via this method. The moment this method
    * returns a list of events, RootInputInitialization is considered to be complete.
    *
-   * @param inputVertexContext initializer context which can be used to access the payload, vertex
-   *                           properties, etc
    * @return a list of events which are eventually routed to a {@link org.apache.tez.dag.api.VertexManagerPlugin}
    * for routing
    * @throws Exception
    */
-  public abstract List<Event> initialize(TezRootInputInitializerContext inputVertexContext)
+  public abstract List<Event> initialize()
       throws Exception;
 
   /**
@@ -57,5 +69,15 @@ public abstract class TezRootInputInitializer {
    * @throws Exception
    */
   public abstract void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception;
+
+  /**
+   * Return ahe {@link org.apache.tez.runtime.api.TezRootInputInitializerContext} for this specific instance of
+   * the Initializer.
+   *
+   * @return the {@link org.apache.tez.runtime.api.TezRootInputInitializerContext} for the input
+   */
+  public final TezRootInputInitializerContext getContext() {
+    return this.initializerContext;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 5c604cb..a1ae243 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -88,8 +88,12 @@ public class RootInputInitializerManager {
   public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
       inputs) {
     for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
-      TezRootInputInitializer initializer = createInitializer(input);
-      InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, vertex, appContext);
+
+      TezRootInputInitializerContext context =
+          new TezRootInputInitializerContextImpl(input, vertex, appContext);
+      TezRootInputInitializer initializer = createInitializer(input, context);
+
+      InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex);
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -100,20 +104,10 @@ public class RootInputInitializerManager {
 
   @VisibleForTesting
   protected TezRootInputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
-      input) {
-    String className = input.getControllerDescriptor().getClassName();
-    @SuppressWarnings("unchecked")
-    Class<? extends TezRootInputInitializer> clazz =
-        (Class<? extends TezRootInputInitializer>) ReflectionUtils
-            .getClazz(className);
-    TezRootInputInitializer initializer = null;
-    try {
-      initializer = clazz.newInstance();
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException("Failed to create input initializerWrapper", e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException("Failed to create input initializerWrapper", e);
-    }
+      input, TezRootInputInitializerContext context) {
+    TezRootInputInitializer initializer = ReflectionUtils
+        .createClazzInstance(input.getControllerDescriptor().getClassName(),
+            new Class[]{TezRootInputInitializerContext.class}, new Object[]{context});
     return initializer;
   }
 
@@ -178,7 +172,7 @@ public class RootInputInitializerManager {
           LOG.info(
               "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
                   " on vertex " + initializerWrapper.getVertexLogIdentifier());
-          return initializerWrapper.getInitializer().initialize(initializerWrapper.context);
+          return initializerWrapper.getInitializer().initialize();
         }
       });
       return events;
@@ -234,11 +228,11 @@ public class RootInputInitializerManager {
     private final String vertexLogIdentifier;
 
     InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
-                       TezRootInputInitializer initializer, Vertex vertex,
-                       AppContext appContext) {
+                       TezRootInputInitializer initializer, TezRootInputInitializerContext context,
+                       Vertex vertex) {
       this.input = input;
       this.initializer = initializer;
-      this.context = new TezRootInputInitializerContextImpl(input, vertex, appContext);
+      this.context = context;
       this.vertexLogIdentifier = vertex.getLogIdentifier();
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 4650faa..074b1bc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -29,15 +29,17 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class BroadcastEdgeManager extends EdgeManager {
 
-  EdgeManagerContext context;
+  public BroadcastEdgeManager(EdgeManagerContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(EdgeManagerContext edgeManagerContext) {
-    this.context = edgeManagerContext;
+  public void initialize() {
   }
   
   @Override
   public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
-    return context.getSourceVertexNumTasks();
+    return getContext().getSourceVertexNumTasks();
   }
   
   @Override
@@ -52,7 +54,7 @@ public class BroadcastEdgeManager extends EdgeManager {
     List<Integer> inputIndices = 
         Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex));
     // for each task make the i-th source task as the i-th physical input
-    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+    for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) {
       destinationTaskAndInputIndices.put(i, inputIndices);
     }
   }
@@ -63,7 +65,7 @@ public class BroadcastEdgeManager extends EdgeManager {
     List<Integer> inputIndices = 
         Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex));
     // for each task make the i-th source task as the i-th physical input
-    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+    for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) {
       destinationTaskAndInputIndices.put(i, inputIndices);
     }
   }
@@ -76,7 +78,7 @@ public class BroadcastEdgeManager extends EdgeManager {
   
   @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
-    return context.getDestinationVertexNumTasks();
+    return getContext().getDestinationVertexNumTasks();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index e2a9a27..0907b28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -33,7 +33,6 @@ import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -114,18 +113,28 @@ public class Edge {
   private void createEdgeManager() {
     switch (edgeProperty.getDataMovementType()) {
       case ONE_TO_ONE:
-        edgeManager = new OneToOneEdgeManager();
+        edgeManagerContext = new EdgeManagerContextImpl(null);
+        edgeManager = new OneToOneEdgeManager(edgeManagerContext);
         break;
       case BROADCAST:
-        edgeManager = new BroadcastEdgeManager();
+        edgeManagerContext = new EdgeManagerContextImpl(null);
+        edgeManager = new BroadcastEdgeManager(edgeManagerContext);
         break;
       case SCATTER_GATHER:
-        edgeManager = new ScatterGatherEdgeManager();
+        edgeManagerContext = new EdgeManagerContextImpl(null);
+        edgeManager = new ScatterGatherEdgeManager(edgeManagerContext);
         break;
       case CUSTOM:
         if (edgeProperty.getEdgeManagerDescriptor() != null) {
+          byte []bb = null;
+          if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
+            bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+          }
+          edgeManagerContext = new EdgeManagerContextImpl(bb);
           String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
-          edgeManager = ReflectionUtils.createClazzInstance(edgeManagerClassName);
+          edgeManager = ReflectionUtils
+              .createClazzInstance(edgeManagerClassName, new Class[]{EdgeManagerContext.class},
+                  new Object[]{edgeManagerContext});
         }
         break;
       default:
@@ -136,16 +145,8 @@ public class Edge {
   }
 
   public void initialize() {
-    byte[] bb = null;
-    if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM) {
-      if (edgeProperty.getEdgeManagerDescriptor() != null && 
-          edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
-        bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
-      }
-    }
-    edgeManagerContext = new EdgeManagerContextImpl(bb);
     if (edgeManager != null) {
-      edgeManager.initialize(edgeManagerContext);
+      edgeManager.initialize();
     }
     destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, 
         destinationVertex.getName(), 

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 2933d5a..b202d70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -34,20 +34,19 @@ import com.google.common.collect.Lists;
  * Starts all tasks immediately on vertex start
  */
 public class ImmediateStartVertexManager extends VertexManagerPlugin {
-  
-  private VertexManagerPluginContext context;
-  
-  ImmediateStartVertexManager() {
+
+  public ImmediateStartVertexManager(VertexManagerPluginContext context) {
+    super(context);
   }
-  
+
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
-    int numTasks = context.getVertexNumTasks(context.getVertexName());
+    int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
     for (int i=0; i<numTasks; ++i) {
       scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
     }
-    context.scheduleVertexTasks(scheduledTasks);
+    getContext().scheduleVertexTasks(scheduledTasks);
   }
 
   @Override
@@ -55,8 +54,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void initialize(VertexManagerPluginContext context) {
-    this.context = context;
+  public void initialize() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 68ab0d3..db2804c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -30,10 +30,14 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 public class OneToOneEdgeManager extends EdgeManager {
 
   List<Integer> destinationInputIndices = 
-      Collections.unmodifiableList(Collections.singletonList(0)); 
+      Collections.unmodifiableList(Collections.singletonList(0));
+
+  public OneToOneEdgeManager(EdgeManagerContext context) {
+    super(context);
+  }
 
   @Override
-  public void initialize(EdgeManagerContext edgeManagerContext) {
+  public void initialize() {
     // Nothing to do.
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 637deaa..5b773ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -39,22 +39,24 @@ import com.google.common.collect.Lists;
 
 public class RootInputVertexManager extends VertexManagerPlugin {
 
-  VertexManagerPluginContext context;
   private String configuredInputName;
 
+  public RootInputVertexManager(VertexManagerPluginContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(VertexManagerPluginContext context) {
-    this.context = context;
+  public void initialize() {
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
-    int numTasks = context.getVertexNumTasks(context.getVertexName());
+    int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
     for (int i=0; i<numTasks; ++i) {
       scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
     }
-    context.scheduleVertexTasks(scheduledTasks);
+    getContext().scheduleVertexTasks(scheduledTasks);
   }
 
   @Override
@@ -74,12 +76,12 @@ public class RootInputVertexManager extends VertexManagerPlugin {
       if (event instanceof RootInputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state check.
         Preconditions.checkState(dataInformationEventSeen == false);
-        Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
+        Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) == -1,
             "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"
-                + ", VertexName: " + context.getVertexName());
+                + ", VertexName: " + getContext().getVertexName());
         Preconditions.checkState(configuredInputName == null,
             "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"
-                + ", VertexName: " + context.getVertexName() + ", ConfiguredInput: "
+                + ", VertexName: " + getContext().getVertexName() + ", ConfiguredInput: "
                 + configuredInputName + ", CurrentInput: " + inputName);
         configuredInputName = inputName;
         RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
@@ -88,7 +90,7 @@ public class RootInputVertexManager extends VertexManagerPlugin {
             inputName,
             cEvent.getRootInputSpecUpdate() == null ? RootInputSpecUpdate
                 .getDefaultSinglePhysicalInputSpecUpdate() : cEvent.getRootInputSpecUpdate());
-        context.setVertexParallelism(cEvent.getNumTasks(),
+        getContext().setVertexParallelism(cEvent.getNumTasks(),
             new VertexLocationHint(cEvent.getTaskLocationHints()), null, rootInputSpecUpdate);
       }
       if (event instanceof RootInputUpdatePayloadEvent) {
@@ -99,11 +101,11 @@ public class RootInputVertexManager extends VertexManagerPlugin {
       } else if (event instanceof RootInputDataInformationEvent) {
         dataInformationEventSeen = true;
         // # Tasks should have been set by this point.
-        Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) != 0);
+        Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) != 0);
         Preconditions.checkState(
             configuredInputName == null || configuredInputName.equals(inputName),
             "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"
-                + ", VertexName:" + context.getVertexName() + ", ConfiguredInput: "
+                + ", VertexName:" + getContext().getVertexName() + ", ConfiguredInput: "
                 + configuredInputName + ", CurrentInput: " + inputName);
         configuredInputName = inputName;
         
@@ -112,6 +114,6 @@ public class RootInputVertexManager extends VertexManagerPlugin {
         riEvents.add(rEvent);
       }
     }
-    context.addRootInputEvents(inputName, riEvents);
+    getContext().addRootInputEvents(inputName, riEvents);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index fcc4347..e5e620c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -29,20 +29,22 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class ScatterGatherEdgeManager extends EdgeManager {
 
-  EdgeManagerContext context;
+  public ScatterGatherEdgeManager(EdgeManagerContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(EdgeManagerContext edgeManagerContext) {
-    this.context = edgeManagerContext;
+  public void initialize() {
   }
 
   @Override
   public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
-    return context.getSourceVertexNumTasks();
+    return getContext().getSourceVertexNumTasks();
   }
   
   @Override
   public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
-    return context.getDestinationVertexNumTasks();
+    return getContext().getDestinationVertexNumTasks();
   }
 
   @Override
@@ -56,7 +58,7 @@ public class ScatterGatherEdgeManager extends EdgeManager {
   @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
-    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+    for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) {
       destinationTaskAndInputIndices.put(i, Collections.singletonList(sourceTaskIndex));
     }
   }
@@ -69,7 +71,7 @@ public class ScatterGatherEdgeManager extends EdgeManager {
 
   @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
-    return context.getDestinationVertexNumTasks();
+    return getContext().getDestinationVertexNumTasks();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3a9f0fe..e520c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1662,8 +1662,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
-            OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance(
-                od.getControllerDescriptor().getClassName());
             OutputCommitterContext outputCommitterContext =
                 new OutputCommitterContextImpl(appContext.getApplicationID(),
                     appContext.getApplicationAttemptId().getAttemptId(),
@@ -1671,10 +1669,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                     vertexName,
                     od,
                     vertexId.getId());
-
+            OutputCommitter outputCommitter = ReflectionUtils
+                .createClazzInstance(od.getControllerDescriptor().getClassName(),
+                    new Class[]{OutputCommitterContext.class},
+                    new Object[]{outputCommitterContext});
             LOG.info("Invoking committer init for output=" + outputName
                 + ", vertexId=" + logIdentifier);
-            outputCommitter.initialize(outputCommitterContext);
+            outputCommitter.initialize();
             outputCommitters.put(outputName, outputCommitter);
             LOG.info("Invoking committer setup for output=" + outputName
                 + ", vertexId=" + logIdentifier);
@@ -1897,24 +1898,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (inputsWithInitializers != null) {
         LOG.info("Setting vertexManager to RootInputVertexManager for "
             + logIdentifier);
-        vertexManager = new VertexManager(new RootInputVertexManager(),
+        vertexManager = new VertexManager(
+            new VertexManagerPluginDescriptor(RootInputVertexManager.class.getName()),
             this, appContext);
       } else if (hasOneToOne && !hasCustom) {
         LOG.info("Setting vertexManager to InputReadyVertexManager for "
             + logIdentifier);
-        vertexManager = new VertexManager(new InputReadyVertexManager(),
+        vertexManager = new VertexManager(
+            new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()),
             this, appContext);
       } else if (hasBipartite && !hasCustom) {
         LOG.info("Setting vertexManager to ShuffleVertexManager for "
             + logIdentifier);
-        vertexManager = new VertexManager(new ShuffleVertexManager(),
+        vertexManager = new VertexManager(
+            new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()),
             this, appContext);
       } else {
         // schedule all tasks upon vertex start. Default behavior.
         LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
-            new ImmediateStartVertexManager(), this, appContext);
+            new VertexManagerPluginDescriptor(ImmediateStartVertexManager.class.getName()),
+            this, appContext);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 5ecc06b..645f440 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -205,17 +205,7 @@ public class VertexManager {
       return null;
     }
   }
-  
-  public VertexManager(VertexManagerPlugin plugin, 
-      Vertex managedVertex, AppContext appContext) {
-    checkNotNull(plugin, "plugin is null");
-    checkNotNull(managedVertex, "managedVertex is null");
-    checkNotNull(appContext, "appContext is null");
-    this.plugin = plugin;
-    this.managedVertex = managedVertex;
-    this.appContext = appContext;
-  }
-  
+
   public VertexManager(VertexManagerPluginDescriptor pluginDesc, 
       Vertex managedVertex, AppContext appContext) {
     checkNotNull(pluginDesc, "pluginDesc is null");
@@ -233,7 +223,8 @@ public class VertexManager {
   public void initialize() {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
-      plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName());
+      plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
+          new Class[]{VertexManagerPluginContext.class}, new Object[]{pluginContext});
       payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload());
     }
     if (payload == null || payload.getPayload() == null) {
@@ -246,7 +237,7 @@ public class VertexManager {
         throw new TezUncheckedException(e);
       }
     }
-    plugin.initialize(pluginContext);
+    plugin.initialize();
   }
 
   public void onVertexStarted(List<TezTaskAttemptID> completions) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index a50771b..c7fb4b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -95,6 +95,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -329,8 +330,8 @@ public class TestDAGImpl {
   
   public static class TotalCountingOutputCommitter extends CountingOutputCommitter {
     static int totalCommitCounter = 0;
-    public TotalCountingOutputCommitter() {
-      super();
+    public TotalCountingOutputCommitter(OutputCommitterContext context) {
+      super(context);
     }
     @Override
     public void commitOutput() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
index c37cc00..e55351e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
@@ -43,8 +43,8 @@ public class TestRootInputVertexManager {
     doReturn("vertex1").when(context).getVertexName();
     doReturn(1).when(context).getVertexNumTasks(eq("vertex1"));
 
-    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager();
-    rootInputVertexManager.initialize(context);
+    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
+    rootInputVertexManager.initialize();
 
     InputDescriptor id1 = mock(InputDescriptor.class);
     List<Event> events1 = new LinkedList<Event>();
@@ -74,8 +74,8 @@ public class TestRootInputVertexManager {
     doReturn("vertex1").when(context).getVertexName();
     doReturn(-1).when(context).getVertexNumTasks(eq("vertex1"));
 
-    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager();
-    rootInputVertexManager.initialize(context);
+    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
+    rootInputVertexManager.initialize();
 
     InputDescriptor id1 = mock(InputDescriptor.class);
     List<Event> events1 = new LinkedList<Event>();

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 7ca8c60..f2adae8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -79,6 +79,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -194,8 +195,7 @@ public class TestVertexImpl {
   private HistoryEventHandler historyEventHandler;
   private static JavaProfilerOptions javaProfilerOptions;
 
-  public static class CountingOutputCommitter extends
-      OutputCommitter {
+  public static class CountingOutputCommitter extends OutputCommitter {
 
     public int initCounter = 0;
     public int setupCounter = 0;
@@ -205,23 +205,18 @@ public class TestVertexImpl {
     private boolean throwErrorOnAbort;
     private boolean throwRuntimeException;
 
-    public CountingOutputCommitter(boolean throwError,
-        boolean throwOnAbort,
-        boolean throwRuntimeException) {
-      this.throwError = throwError;
-      this.throwErrorOnAbort = throwOnAbort;
-      this.throwRuntimeException = throwRuntimeException;
-    }
-
-    public CountingOutputCommitter() {
-      this(false, false, false);
+    public CountingOutputCommitter(OutputCommitterContext context) {
+      super(context);
+      this.throwError = false;
+      this.throwErrorOnAbort = false;
+      this.throwRuntimeException = false;
     }
 
     @Override
-    public void initialize(OutputCommitterContext context) throws IOException {
-      if (context.getUserPayload() != null) {
+    public void initialize() throws IOException {
+      if (getContext().getUserPayload() != null) {
         CountingOutputCommitterConfig conf =
-            new CountingOutputCommitterConfig(context.getUserPayload());
+            new CountingOutputCommitterConfig(getContext().getUserPayload());
         this.throwError = conf.throwError;
         this.throwErrorOnAbort = conf.throwErrorOnAbort;
         this.throwRuntimeException = conf.throwRuntimeException;
@@ -2702,8 +2697,10 @@ public class TestVertexImpl {
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
-    v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+    v1.vertexManager = new VertexManager(
+        new VertexManagerPluginDescriptor(VertexManagerPluginForTest.class.getName()),
         v1, appContext);
+    v1.vertexManager.initialize();
     startVertex(v1);
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -2737,8 +2734,10 @@ public class TestVertexImpl {
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
-    v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+    v1.vertexManager = new VertexManager(
+        new VertexManagerPluginDescriptor(VertexManagerPluginForTest.class.getName()),
         v1, appContext);
+    v1.vertexManager.initialize();
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
     Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
@@ -2818,7 +2817,7 @@ public class TestVertexImpl {
   @Test(timeout = 10000)
   public void testRootInputInitializerEvent() throws Exception {
     useCustomInitializer = true;
-    customInitializer = new EventHandlingRootInputInitializer();
+    customInitializer = new EventHandlingRootInputInitializer(null);
     EventHandlingRootInputInitializer initializer =
         (EventHandlingRootInputInitializer) customInitializer;
     setupPreDagCreation();
@@ -3168,7 +3167,8 @@ public class TestVertexImpl {
 
     @Override
     protected TezRootInputInitializer createInitializer(
-        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) {
+        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
+        TezRootInputInitializerContext context) {
       return presetInitializer;
     }
   }
@@ -3201,11 +3201,12 @@ public class TestVertexImpl {
 
     @Override
     protected TezRootInputInitializer createInitializer(
-        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) {
+        RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
+        TezRootInputInitializerContext context) {
 
-      return new TezRootInputInitializer() {
+      return new TezRootInputInitializer(context) {
         @Override
-        public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws
+        public List<Event> initialize() throws
             Exception {
           return null;
         }
@@ -3393,12 +3394,14 @@ public class TestVertexImpl {
   @InterfaceAudience.Private
   public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
 
-    private VertexManagerPluginContext context;
     private static final int NUM_TASKS = 5;
 
+    public RootInputSpecUpdaterVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
     @Override
-    public void initialize(VertexManagerPluginContext context) {
-      this.context = context;
+    public void initialize() {
     }
 
     @Override
@@ -3417,7 +3420,7 @@ public class TestVertexImpl {
     public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
         List<Event> events) {
       Map<String, RootInputSpecUpdate> map = new HashMap<String, RootInputSpecUpdate>();
-      if (context.getUserPayload()[0] == 0) {
+      if (getContext().getUserPayload()[0] == 0) {
         map.put("input3", RootInputSpecUpdate.createAllTaskRootInputSpecUpdate(4));
       } else {
         List<Integer> pInputList = new LinkedList<Integer>();
@@ -3426,7 +3429,7 @@ public class TestVertexImpl {
         }
         map.put("input4", RootInputSpecUpdate.createPerTaskRootInputSpecUpdate(pInputList));
       }
-      context.setVertexParallelism(NUM_TASKS, null, null, map);
+      getContext().setVertexParallelism(NUM_TASKS, null, null, map);
     }
   }
 
@@ -3440,9 +3443,13 @@ public class TestVertexImpl {
     private final ReentrantLock lock = new ReentrantLock();
     private final Condition eventCondition = lock.newCondition();
 
+    public EventHandlingRootInputInitializer(
+        TezRootInputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
     @Override
-    public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws
-        Exception {
+    public List<Event> initialize() throws Exception {
       initStarted.set(true);
       lock.lock();
       try {

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index 222c2f5..a777022 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -28,20 +28,14 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class EdgeManagerForTest extends EdgeManager {
 
-  private EdgeManagerContext edgeManagerContext = null;
   private byte[] userPayload;
 
-  public static EdgeManagerForTest createInstance() {
-    EdgeManagerForTest e = new EdgeManagerForTest();
-    return e;
+  public EdgeManagerForTest(EdgeManagerContext context) {
+    super(context);
   }
 
   public EdgeManagerContext getEdgeManagerContext() {
-    return edgeManagerContext;
-  }
-
-  
-  public EdgeManagerForTest() {
+    return getContext();
   }
 
   public byte[] getUserPayload() {
@@ -50,9 +44,8 @@ public class EdgeManagerForTest extends EdgeManager {
 
   // Overridden methods
   @Override
-  public void initialize(EdgeManagerContext edgeManagerContext) {
-    this.edgeManagerContext = edgeManagerContext;
-    this.userPayload = edgeManagerContext.getUserPayload();
+  public void initialize() {
+    this.userPayload = getContext().getUserPayload();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
index be0d408..422d785 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -29,8 +29,12 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 public class VertexManagerPluginForTest extends VertexManagerPlugin {
 
+  public VertexManagerPluginForTest(VertexManagerPluginContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(VertexManagerPluginContext context) {}
+  public void initialize() {}
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {}

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 9311754..1f6c8bd 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -47,30 +47,32 @@ public class MROutputCommitter extends OutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(MROutputCommitter.class);
 
-  private OutputCommitterContext context;
   private org.apache.hadoop.mapreduce.OutputCommitter committer = null;
   private JobContext jobContext = null;
   private volatile boolean initialized = false;
   private JobConf jobConf = null;
   private boolean newApiCommitter;
 
+  public MROutputCommitter(OutputCommitterContext committerContext) {
+    super(committerContext);
+  }
+
   @Override
-  public void initialize(OutputCommitterContext context) throws IOException {
-    byte[] userPayload = context.getOutputUserPayload();
+  public void initialize() throws IOException {
+    byte[] userPayload = getContext().getOutputUserPayload();
     if (userPayload == null) {
       jobConf = new JobConf();
     } else {
       jobConf = new JobConf(
-          MRHelpers.createConfFromUserPayload(context.getOutputUserPayload()));
+          MRHelpers.createConfFromUserPayload(getContext().getOutputUserPayload()));
     }
 
     // Read all credentials into the credentials instance stored in JobConf.
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        context.getDAGAttemptNumber());
-    this.context = context;
-    committer = getOutputCommitter(this.context);
-    jobContext = getJobContextFromVertexContext(context);
+        getContext().getDAGAttemptNumber());
+    committer = getOutputCommitter(getContext());
+    jobContext = getJobContextFromVertexContext(getContext());
     initialized = true;
   }
 
@@ -196,9 +198,9 @@ public class MROutputCommitter extends OutputCommitter {
       throw new RuntimeException("Committer not initialized");
     }
     TaskAttemptID taskAttemptID = new TaskAttemptID(
-        Long.toString(context.getApplicationId().getClusterTimestamp())
-        + String.valueOf(context.getVertexIndex()),
-        context.getApplicationId().getId(),
+        Long.toString(getContext().getApplicationId().getClusterTimestamp())
+        + String.valueOf(getContext().getVertexIndex()),
+        getContext().getApplicationId().getId(),
         ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
             TaskType.MAP : TaskType.REDUCE)),
         taskIndex, attemptId);

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index ba3ee7a..0777c73 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -52,21 +52,21 @@ public class MRInputAMSplitGenerator extends TezRootInputInitializer {
 
   private boolean sendSerializedEvents;
   
-  private static final Log LOG = LogFactory
-      .getLog(MRInputAMSplitGenerator.class);
+  private static final Log LOG = LogFactory.getLog(MRInputAMSplitGenerator.class);
 
-  public MRInputAMSplitGenerator() {
+  public MRInputAMSplitGenerator(
+      TezRootInputInitializerContext initializerContext) {
+    super(initializerContext);
   }
 
   @Override
-  public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
-      throws Exception {
+  public List<Event> initialize() throws Exception {
     Stopwatch sw = null;
     if (LOG.isDebugEnabled()) {
       sw = new Stopwatch().start();
     }
     MRInputUserPayloadProto userPayloadProto = MRHelpers
-        .parseMRInputPayload(rootInputContext.getInputUserPayload());
+        .parseMRInputPayload(getContext().getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
@@ -91,15 +91,15 @@ public class MRInputAMSplitGenerator extends TezRootInputInitializer {
       sw.reset().start();
     }
 
-    int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
-    int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+    int totalResource = getContext().getTotalAvailableResource().getMemory();
+    int taskResource = getContext().getVertexTaskResource().getMemory();
     float waves = conf.getFloat(
         TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
         TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
 
     int numTasks = (int)((totalResource*waves)/taskResource);
 
-    LOG.info("Input " + rootInputContext.getInputName() + " asking for " + numTasks
+    LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
         + " tasks. Headroom: " + totalResource + " Task Resource: "
         + taskResource + " waves: " + waves);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 87d88a6..68c3f05 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -47,19 +47,19 @@ public class MRInputSplitDistributor extends TezRootInputInitializer {
   
   private boolean sendSerializedEvents;
 
-  public MRInputSplitDistributor() {
-  }
-
   private MRSplitsProto splitsProto;
 
+  public MRInputSplitDistributor(TezRootInputInitializerContext initializerContext) {
+    super(initializerContext);
+  }
+
   @Override
-  public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
-      throws IOException {
+  public List<Event> initialize() throws IOException {
     Stopwatch sw = null;
     if (LOG.isDebugEnabled()) {
       sw = new Stopwatch().start();
     }
-    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
+    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(getContext().getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 2f7c13e..e36eb4d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -43,6 +43,7 @@ import org.apache.tez.mapreduce.lib.MRReaderMapred;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -77,7 +78,11 @@ public class MRInput extends MRInputBase {
   
   @Private
   volatile boolean splitInfoViaEvents;
-  
+
+  public MRInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   /**
    * Helper API to generate the user payload for the MRInput and
    * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 9171492..d999780 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
 @LimitedPrivate("Hive")
@@ -41,6 +42,10 @@ public class MRInputLegacy extends MRInput {
   private ReentrantLock eventLock = new ReentrantLock();
   private Condition eventCondition = eventLock.newCondition();
 
+  public MRInputLegacy(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   @Private
   protected void initializeInternal() throws IOException {
     LOG.info("MRInputLegacy deferring initialization");

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 8a759f8..b4df3f0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -39,6 +39,7 @@ import org.apache.tez.mapreduce.lib.MRReaderMapred;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -46,9 +47,8 @@ public class MultiMRInput extends MRInputBase {
 
   private static final Log LOG = LogFactory.getLog(MultiMRInput.class);
 
-  @Override
-  public int getNumPhysicalInputs() {
-    return super.getNumPhysicalInputs();
+  public MultiMRInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
   }
 
   private final ReentrantLock lock = new ReentrantLock();

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index a6a0d83..115b45b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -34,6 +34,7 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 
 import java.io.IOException;
 import java.util.List;
@@ -45,6 +46,10 @@ public abstract class MRInputBase extends AbstractLogicalInput {
   protected JobConf jobConf;
   protected TezCounter inputRecordCounter;
 
+  public MRInputBase(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   @Override
   public Reader getReader() throws Exception {
     return null;

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 3cca35c..ed473e7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -48,6 +48,7 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 public class MROutput extends AbstractLogicalOutput {
@@ -79,7 +80,11 @@ public class MROutput extends AbstractLogicalOutput {
   private boolean isMapperOutput;
 
   protected OutputCommitter committer;
-  
+
+  public MROutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
   /**
    * Creates the user payload to be set on the OutputDescriptor for MROutput
    * @param conf Configuration for the OutputFormat

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
index 7c9f804..f6ac07f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
@@ -19,9 +19,14 @@
 package org.apache.tez.mapreduce.output;
 
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.tez.runtime.api.TezOutputContext;
 
 public class MROutputLegacy extends MROutput {
 
+  public MROutputLegacy(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
   public OutputCommitter getOutputCommitter() {
     return committer;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index 84c945e..5573d77 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -68,9 +68,9 @@ public class TestMRInputSplitDistributor {
     byte[] userPayload = payloadProto.build().toByteArray();
 
     TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
-    MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
+    MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
 
-    List<Event> events = splitDist.initialize(context);
+    List<Event> events = splitDist.initialize();
 
     assertEquals(3, events.size());
     assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
@@ -116,9 +116,9 @@ public class TestMRInputSplitDistributor {
     byte[] userPayload = payloadProto.build().toByteArray();
 
     TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
-    MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
+    MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
 
-    List<Event> events = splitDist.initialize(context);
+    List<Event> events = splitDist.initialize();
 
     assertEquals(3, events.size());
     assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 3aa5ddc..57501e3 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -96,9 +96,8 @@ public class TestMultiMRInput {
 
     TezInputContext inputContext = createTezInputContext(payload);
 
-    MultiMRInput input = new MultiMRInput();
-    input.setNumPhysicalInputs(1);
-    input.initialize(inputContext);
+    MultiMRInput input = new MultiMRInput(inputContext, 1);
+    input.initialize();
     List<Event> eventList = new ArrayList<Event>();
 
     String file1 = "file1";
@@ -147,9 +146,8 @@ public class TestMultiMRInput {
 
     TezInputContext inputContext = createTezInputContext(payload);
 
-    MultiMRInput input = new MultiMRInput();
-    input.setNumPhysicalInputs(2);
-    input.initialize(inputContext);
+    MultiMRInput input = new MultiMRInput(inputContext, 2);
+    input.initialize();
     List<Event> eventList = new ArrayList<Event>();
 
     LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
@@ -212,9 +210,8 @@ public class TestMultiMRInput {
 
     TezInputContext inputContext = createTezInputContext(payload);
 
-    MultiMRInput input = new MultiMRInput();
-    input.setNumPhysicalInputs(1);
-    input.initialize(inputContext);
+    MultiMRInput input = new MultiMRInput(inputContext, 1);
+    input.initialize();
     List<Event> eventList = new ArrayList<Event>();
 
     String file1 = "file1";


Mime
View raw message