tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager to require constructors for creation, and remove the initialize methods. (sseth)
Date Wed, 30 Jul 2014 23:27:04 GMT
Repository: tez
Updated Branches:
  refs/heads/master 770e3058a -> 2213c109e


http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index a72dafa..5511b72 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -52,7 +53,6 @@ import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.InputFrameworkInterface;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalInputFrameworkInterface;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.LogicalOutputFrameworkInterface;
 import org.apache.tez.runtime.api.MergedLogicalInput;
@@ -60,6 +60,7 @@ import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.api.OutputFrameworkInterface;
 import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -368,17 +369,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     public Void call() throws Exception {
       LOG.info("Initializing Input using InputSpec: " + inputSpec);
       String edgeName = inputSpec.getSourceVertexName();
-      LogicalInput input = createInput(inputSpec);
-      TezInputContext inputContext = createInputContext(input, inputSpec, inputIndex);
+      TezInputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex);
+      LogicalInput input = createInput(inputSpec, inputContext);
+
       inputsMap.put(edgeName, input);
       inputContextMap.put(edgeName, inputContext);
 
-      if (input instanceof LogicalInputFrameworkInterface) {
-        ((LogicalInputFrameworkInterface) input).setNumPhysicalInputs(inputSpec
-            .getPhysicalEdgeCount());
-      }
       LOG.info("Initializing Input with src edge: " + edgeName);
-      List<Event> events = ((InputFrameworkInterface)input).initialize(inputContext);
+      List<Event> events = ((InputFrameworkInterface)input).initialize();
       sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
           inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
           taskSpec.getTaskAttemptID());
@@ -419,17 +417,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     public Void call() throws Exception {
       LOG.info("Initializing Output using OutputSpec: " + outputSpec);
       String edgeName = outputSpec.getDestinationVertexName();
-      LogicalOutput output = createOutput(outputSpec);
       TezOutputContext outputContext = createOutputContext(outputSpec, outputIndex);
+      LogicalOutput output = createOutput(outputSpec, outputContext);
+
       outputsMap.put(edgeName, output);
       outputContextMap.put(edgeName, outputContext);
 
-      if (output instanceof LogicalOutputFrameworkInterface) {
-        ((LogicalOutputFrameworkInterface) output).setNumPhysicalOutputs(outputSpec
-            .getPhysicalEdgeCount());
-      }
       LOG.info("Initializing Output with dest edge: " + edgeName);
-      List<Event> events = ((OutputFrameworkInterface)output).initialize(outputContext);
+      List<Event> events = ((OutputFrameworkInterface)output).initialize();
       sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
           outputContext.getTaskVertexName(),
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
@@ -451,15 +446,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
      groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
         LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
-        MergedLogicalInput groupInput = (MergedLogicalInput) createInputFromDescriptor(
-            groupInputSpec.getMergedInputDescriptor());
-        List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
-        for (String groupVertex : groupInputSpec.getGroupVertices()) {
-          inputs.add(inputsMap.get(groupVertex));
-        }
-        groupInput.initialize(inputs, new TezMergedInputContextImpl(
-            groupInputSpec.getMergedInputDescriptor().getUserPayload(),
-            groupInput, inputReadyTracker, localDirs));
+       TezMergedInputContext mergedInputContext =
+           new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
+               groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
+       List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
+       for (String groupVertex : groupInputSpec.getGroupVertices()) {
+         inputs.add(inputsMap.get(groupVertex));
+       }
+
+       MergedLogicalInput groupInput =
+           (MergedLogicalInput) createMergedInput(groupInputSpec.getMergedInputDescriptor(),
+               mergedInputContext, inputs);
+
         groupInputsMap.put(groupInputSpec.getGroupName(), groupInput);
       }
     }
@@ -475,7 +473,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         + processorDescriptor.getClassName());
   }
 
-  private TezInputContext createInputContext(Input input, InputSpec inputSpec, int inputIndex) {
+  private TezInputContext createInputContext(Map<String, LogicalInput> inputMap,
+                                             InputSpec inputSpec, int inputIndex) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf, localDirs,
         appAttemptNumber, tezUmbilical,
         taskSpec.getDAGName(), taskSpec.getVertexName(),
@@ -483,7 +482,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         tezCounters, inputIndex,
         inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        inputSpec.getInputDescriptor(), input, inputReadyTracker, objectRegistry);
+        inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry);
     return inputContext;
   }
 
@@ -510,25 +509,41 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return processorContext;
   }
 
-  private LogicalInput createInput(InputSpec inputSpec) {
+  private LogicalInput createInput(InputSpec inputSpec, TezInputContext inputContext) {
     LOG.info("Creating Input");
-    return createInputFromDescriptor(inputSpec.getInputDescriptor());
+    InputDescriptor inputDesc = inputSpec.getInputDescriptor();
+    Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
+        new Class[]{TezInputContext.class, Integer.TYPE},
+        new Object[]{inputContext, inputSpec.getPhysicalEdgeCount()});
+    if (!(input instanceof LogicalInput)) {
+      throw new TezUncheckedException(inputDesc.getClass().getName()
+          + " is not a sub-type of LogicalInput."
+          + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
+    }
+    return (LogicalInput) input;
   }
 
-  private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) {
-    Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName());
+  private LogicalInput createMergedInput(InputDescriptor inputDesc,
+                                         TezMergedInputContext mergedInputContext,
+                                         List<Input> constituentInputs) {
+    LogicalInput input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
+        new Class[]{TezMergedInputContext.class, List.class},
+        new Object[]{mergedInputContext, constituentInputs});
     if (!(input instanceof LogicalInput)) {
       throw new TezUncheckedException(inputDesc.getClass().getName()
           + " is not a sub-type of LogicalInput."
           + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
     }
-    return (LogicalInput)input;
+    return input;
   }
 
-  private LogicalOutput createOutput(OutputSpec outputSpec) {
+  private LogicalOutput createOutput(OutputSpec outputSpec, TezOutputContext outputContext) {
     LOG.info("Creating Output");
-    Output output = ReflectionUtils.createClazzInstance(outputSpec
-        .getOutputDescriptor().getClassName());
+    OutputDescriptor outputDesc = outputSpec.getOutputDescriptor();
+    Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(),
+        new Class[]{TezOutputContext.class, Integer.TYPE},
+        new Object[]{outputContext, outputSpec.getPhysicalEdgeCount()});
+
     if (!(output instanceof LogicalOutput)) {
       throw new TezUncheckedException(output.getClass().getName()
           + " is not a sub-type of LogicalOutput."

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index e98d694..627e830 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -40,7 +40,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
@@ -53,24 +53,27 @@ public class TezInputContextImpl extends TezTaskContextImpl
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
-  private final Input input;
+  private final Map<String, LogicalInput> inputs;
   private final InputReadyTracker inputReadyTracker;
 
   @Private
-  public TezInputContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String dagName, String taskVertexName,
-      String sourceVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, int inputIndex, @Nullable byte[] userPayload,
-      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
-      Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      InputDescriptor inputDescriptor,  Input input, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) {
+  public TezInputContextImpl(Configuration conf, String[] workDirs,
+                             int appAttemptNumber,
+                             TezUmbilical tezUmbilical, String dagName, String taskVertexName,
+                             String sourceVertexName, TezTaskAttemptID taskAttemptID,
+                             TezCounters counters, int inputIndex, @Nullable byte[] userPayload,
+                             RuntimeTask runtimeTask,
+                             Map<String, ByteBuffer> serviceConsumerMetadata,
+                             Map<String, String> auxServiceEnv, MemoryDistributor memDist,
+                             InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
+                             InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, taskAttemptID,
         wrapCounters(counters, taskVertexName, sourceVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, inputDescriptor, objectRegistry);
     checkNotNull(inputIndex, "inputIndex is null");
     checkNotNull(sourceVertexName, "sourceVertexName is null");
-    checkNotNull(input, "input is null");
+    checkNotNull(inputs, "input map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.inputIndex = inputIndex;
@@ -78,7 +81,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.sourceInfo = new EventMetaData(
         EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
-    this.input = input;
+    this.inputs = inputs;
     this.inputReadyTracker = inputReadyTracker;
   }
 
@@ -126,6 +129,6 @@ public class TezInputContextImpl extends TezTaskContextImpl
 
   @Override
   public void inputIsReady() {
-    inputReadyTracker.setInputIsReady(input);
+    inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 8582307..cf55d39 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -4,26 +4,34 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.TezMergedInputContext;
 
 
 public class TezMergedInputContextImpl implements TezMergedInputContext {
 
   private final TezUserPayload userPayload;
-  private final Input input;
+  private final String groupInputName;
+  private final Map<String, MergedLogicalInput> groupInputsMap;
   private final InputReadyTracker inputReadyTracker;
   private final String[] workDirs;
 
-  public TezMergedInputContextImpl(@Nullable byte[] userPayload,
-      Input input, InputReadyTracker inputReadyTracker, String[] workDirs) {
-    checkNotNull(input, "input is null");
+  public TezMergedInputContextImpl(@Nullable byte[] userPayload, String groupInputName,
+                                   Map<String, MergedLogicalInput> groupInputsMap,
+                                   InputReadyTracker inputReadyTracker, String[] workDirs) {
+    checkNotNull(groupInputName, "groupInputName is null");
+    checkNotNull(groupInputsMap, "input-group map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
+    this.groupInputName = groupInputName;
+    this.groupInputsMap = groupInputsMap;
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
-    this.input = input;
     this.inputReadyTracker = inputReadyTracker;
     this.workDirs = workDirs;
   }
@@ -36,7 +44,7 @@ public class TezMergedInputContextImpl implements TezMergedInputContext {
   
   @Override
   public void inputIsReady() {
-    inputReadyTracker.setInputIsReady(input);
+    inputReadyTracker.setInputIsReady(groupInputsMap.get(groupInputName));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index 03e597b..352d09c 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.tez.runtime.api.AbstractLogicalInput;
@@ -31,6 +33,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
 import org.junit.Test;
 
@@ -142,8 +145,7 @@ public class TestInputReadyTracker {
     ImmediatelyReadyInputForTest input3 = new ImmediatelyReadyInputForTest(inputReadyTracker);
     ControlledReadyInputForTest input4 = new ControlledReadyInputForTest(inputReadyTracker);
     
-    AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest();
-    AllMergedInputForTest group2 = new AllMergedInputForTest();
+
     
     List<Input> group1Inputs = new ArrayList<Input>();
     group1Inputs.add(input1);
@@ -153,9 +155,15 @@ public class TestInputReadyTracker {
     group2Inputs.add(input3);
     group2Inputs.add(input4);
 
-    group1.initialize(group1Inputs, new TezMergedInputContextImpl(null, group1, inputReadyTracker, null));
-    group2.initialize(group2Inputs, new TezMergedInputContextImpl(null, group2, inputReadyTracker, null));
-    
+    Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>();
+    TezMergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null);
+    TezMergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null);
+
+    AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs);
+    AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs);
+    mergedInputMap.put("group1", group1);
+    mergedInputMap.put("group2", group2);
+
     // Register groups with tracker
     List<MergedLogicalInput> groups = Lists.newArrayList(group1, group2);
     inputReadyTracker.setGroupedInputs(groups);
@@ -210,6 +218,7 @@ public class TestInputReadyTracker {
     private volatile boolean isReady = false;
     
     ImmediatelyReadyInputForTest(InputReadyTracker inputReadyTracker) {
+      super(null, 0);
       isReady = true;
       inputReadyTracker.setInputIsReady(this);
     }
@@ -244,6 +253,7 @@ public class TestInputReadyTracker {
     private InputReadyTracker inputReadyTracker;
     
     ControlledReadyInputForTest(InputReadyTracker inputReadyTracker) {
+      super(null, 0);
       this.inputReadyTracker = inputReadyTracker;
     }
 
@@ -280,7 +290,11 @@ public class TestInputReadyTracker {
   private static class AnyOneMergedInputForTest extends MergedLogicalInput {
 
     private volatile boolean isReady = false;
-    
+
+    public AnyOneMergedInputForTest(TezMergedInputContext context, List<Input> inputs) {
+      super(context, inputs);
+    }
+
     @Override
     public Reader getReader() throws Exception {
       return null;
@@ -297,7 +311,11 @@ public class TestInputReadyTracker {
 
     private volatile boolean isReady = false;
     private Set<Input> readyInputs = Sets.newHashSet();
-    
+
+    public AllMergedInputForTest(TezMergedInputContext context, List<Input> inputs) {
+      super(context, inputs);
+    }
+
     @Override
     public Reader getReader() throws Exception {
       return null;

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index a2876a7..dcf3303 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -41,6 +41,8 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -168,7 +170,8 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     public static volatile int startCount = 0;
 
-    public TestInput() {
+    public TestInput(TezInputContext inputContext, int numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
     }
 
     @Override
@@ -204,9 +207,11 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     public static volatile int startCount = 0;
 
-    public TestOutput() {
+    public TestOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
     }
 
+
     @Override
     public List<Event> initialize() throws Exception {
       getContext().requestInitialMemory(0, null);

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index e5439de..c19f3a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -42,13 +42,16 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   private static final Log LOG = 
       LogFactory.getLog(InputReadyVertexManager.class);
 
-  VertexManagerPluginContext context;
   Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
   boolean taskIsStarted[];
   int oneToOneSrcTasksDoneCount[];
   Container oneToOneLocationHints[];
   int numOneToOneEdges;
-  
+
+  public InputReadyVertexManager(VertexManagerPluginContext context) {
+    super(context);
+  }
+
   class SourceVertexInfo {
     EdgeProperty edgeProperty;
     int numTasks;
@@ -64,25 +67,24 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   }
   
   @Override
-  public void initialize(VertexManagerPluginContext context) {
-    this.context = context;
+  public void initialize() {
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
-    int numManagedTasks = context.getVertexNumTasks(context.getVertexName());
-    LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + context.getVertexName());
+    int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+    LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
     taskIsStarted = new boolean[numManagedTasks];
 
     // find out about all input edge types. If there is a custom edge then 
     // TODO Until TEZ-1013 we cannot handle custom input formats
-    Map<String, EdgeProperty> edges = context.getInputVertexEdgeProperties();
+    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
     int oneToOneSrcTaskCount = 0;
     numOneToOneEdges = 0;
     for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
       EdgeProperty edgeProp = entry.getValue();
       String srcVertex = entry.getKey();
-      int numSrcTasks = context.getVertexNumTasks(srcVertex);
+      int numSrcTasks = getContext().getVertexNumTasks(srcVertex);
       switch (edgeProp.getDataMovementType()) {
       case CUSTOM:
         throw new TezUncheckedException("Cannot handle custom edge");
@@ -145,7 +147,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
         oneToOneSrcTasksDoneCount[taskId.intValue()]++;
         // keep the latest container that completed as the location hint
         // After there is standard data size info available then use it
-        oneToOneLocationHints[taskId.intValue()] = context.getTaskContainer(vertex, taskId);
+        oneToOneLocationHints[taskId.intValue()] = getContext().getTaskContainer(vertex, taskId);
       }
     }
     
@@ -174,7 +176,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     if (numOneToOneEdges == 0) {
       // no 1-1 dependency. Start all tasks
       int numTasks = taskIsStarted.length;
-      LOG.info("Starting all " + numTasks + "tasks for vertex: " + context.getVertexName());
+      LOG.info("Starting all " + numTasks + "tasks for vertex: " + getContext().getVertexName());
       tasksToStart = Lists.newArrayListWithCapacity(numTasks);
       for (int i=0; i<numTasks; ++i) {
         taskIsStarted[i] = true;
@@ -191,7 +193,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
             locationHint = new TaskLocationHint(oneToOneLocationHints[i].getId());
           }
           LOG.info("Starting task " + i + " for vertex: "
-              + context.getVertexName() + " with location: "
+              + getContext().getVertexName() + " with location: "
               + ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null"));
           tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
         }
@@ -199,7 +201,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
     
     if (tasksToStart != null && !tasksToStart.isEmpty()) {
-      context.scheduleVertexTasks(tasksToStart);
+      getContext().scheduleVertexTasks(tasksToStart);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 2dc42d0..83c96e6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -105,7 +105,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   private static final Log LOG = 
                    LogFactory.getLog(ShuffleVertexManager.class);
 
-  VertexManagerPluginContext context;
   float slowStartMinSrcCompletionFraction;
   float slowStartMaxSrcCompletionFraction;
   long desiredTaskInputDataSize = 1024*1024*100L;
@@ -121,11 +120,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   
   Map<String, Set<Integer>> bipartiteSources = Maps.newHashMap();
   long completedSourceTasksOutputSize = 0;
-  
-  public ShuffleVertexManager() {
+
+  public ShuffleVertexManager(VertexManagerPluginContext context) {
+    super(context);
   }
-  
-  
+
   public static class CustomShuffleEdgeManager extends EdgeManager {
     int numSourceTaskOutputs;
     int numDestinationTasks;
@@ -133,13 +132,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     int remainderRangeForLastShuffler;
     int numSourceTasks;
 
-    public CustomShuffleEdgeManager() {
+    public CustomShuffleEdgeManager(EdgeManagerContext context) {
+      super(context);
     }
 
     @Override
-    public void initialize(EdgeManagerContext edgeManagerContext) {
+    public void initialize() {
       // Nothing to do. This class isn't currently designed to be used at the DAG API level.
-      byte[] userPayload = edgeManagerContext.getUserPayload();
+      byte[] userPayload = getContext().getUserPayload();
       if (userPayload == null
         || userPayload.length == 0) {
         throw new RuntimeException("Could not initialize CustomShuffleEdgeManager"
@@ -298,12 +298,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     pendingTasks = Lists.newArrayListWithCapacity(
-        context.getVertexNumTasks(context.getVertexName()));
+        getContext().getVertexNumTasks(getContext().getVertexName()));
     // track the tasks in this vertex
     updatePendingTasks();
     updateSourceTaskCount();
     
-    LOG.info("OnVertexStarted vertex: " + context.getVertexName() + 
+    LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
              " with " + totalNumSourceTasks + " source tasks and " + 
              totalTasksToSchedule + " pending tasks");
     
@@ -357,7 +357,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   
   void updatePendingTasks() {
     pendingTasks.clear();
-    for (int i=0; i<context.getVertexNumTasks(context.getVertexName()); ++i) {
+    for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
       pendingTasks.add(new Integer(i));
     }
     totalTasksToSchedule = pendingTasks.size();
@@ -367,7 +367,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // track source vertices
     int numSrcTasks = 0;
     for(String vertex : bipartiteSources.keySet()) {
-      numSrcTasks += context.getVertexNumTasks(vertex);
+      numSrcTasks += getContext().getVertexNumTasks(vertex);
     }
     totalNumSourceTasks = numSrcTasks;
   }
@@ -410,7 +410,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
           (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
 
-    LOG.info("Reduce auto parallelism for vertex: " + context.getVertexName() 
+    LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName()
         + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
         + " . Expected output: " + expectedTotalSourceTasksOutputSize 
         + " based on actual output: " + completedSourceTasksOutputSize
@@ -427,7 +427,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         CustomShuffleEdgeManagerConfig edgeManagerConfig =
             new CustomShuffleEdgeManagerConfig(
                 currentParallelism, finalTaskParallelism, 
-                context.getVertexNumTasks(vertex), basePartitionRange,
+                getContext().getVertexNumTasks(vertex), basePartitionRange,
                 ((remainderRangeForLastShuffler > 0) ?
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerDescriptor edgeManagerDescriptor =
@@ -436,7 +436,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         edgeManagers.put(vertex, edgeManagerDescriptor);
       }
       
-      context.setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
+      getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
       updatePendingTasks();      
     }
   }
@@ -460,7 +460,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       scheduledTasks.add(new TaskWithLocationHint(pendingTasks.get(0), null));
       pendingTasks.remove(0);
     }
-    context.scheduleVertexTasks(scheduledTasks);
+    getContext().scheduleVertexTasks(scheduledTasks);
   }
   
   void schedulePendingTasks() {
@@ -472,7 +472,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     if (numSourceTasksCompleted == totalNumSourceTasks && numPendingTasks > 0) {
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
-          " remaining tasks for vertex: " + context.getVertexName());
+          " remaining tasks for vertex: " + getContext().getVertexName());
       schedulePendingTasks(numPendingTasks);
       return;
     }
@@ -515,7 +515,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       // numTasksToSchedule can be -ve if numSourceTasksCompleted does not 
       // does not increase monotonically
       LOG.info("Scheduling " + numTasksToSchedule + " tasks for vertex: " + 
-               context.getVertexName() + " with totalTasks: " + 
+               getContext().getVertexName() + " with totalTasks: " +
                totalTasksToSchedule + ". " + numSourceTasksCompleted + 
                " source tasks completed out of " + totalNumSourceTasks + 
                ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
@@ -526,15 +526,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void initialize(VertexManagerPluginContext context) {
+  public void initialize() {
     Configuration conf;
     try {
-      conf = TezUtils.createConfFromUserPayload(context.getUserPayload());
+      conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
-    
-    this.context = context;
 
     this.slowStartMinSrcCompletionFraction = conf
         .getFloat(
@@ -571,7 +569,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
         + minTaskParallelism);
     
-    Map<String, EdgeProperty> inputs = context.getInputVertexEdgeProperties();
+    Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
     for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
       if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
         String vertex = entry.getKey();

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index f142ee9..5c2e82f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -19,15 +19,22 @@
 package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
 
+  public ConcatenatedMergedKeyValueInput(TezMergedInputContext context,
+                                         List<Input> inputs) {
+    super(context, inputs);
+  }
+
   public class ConcatenatedMergedKeyValueReader implements KeyValueReader {
     private int currentReaderIndex = 0;
     private KeyValueReader currentReader;

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 8affa14..3c56d07 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -19,15 +19,22 @@
 package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
 
+  public ConcatenatedMergedKeyValuesInput(TezMergedInputContext context,
+                                          List<Input> inputs) {
+    super(context, inputs);
+  }
+
   public class ConcatenatedMergedKeyValuesReader implements KeyValuesReader {
     private int currentReaderIndex = 0;
     private KeyValuesReader currentReader;

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 93ca93e..e1f825e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
 
 /**
@@ -32,6 +33,10 @@ import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
  */
 public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
+  public LocalMergedInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   @Override
   public List<Event> initialize() throws IOException {
     getContext().requestInitialMemory(0l, null); // mandatory call.

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 6c50b93..01b9de7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -39,6 +39,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -79,6 +80,10 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
 
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+  public ShuffledMergedInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   @Override
   public synchronized List<Event> initialize() throws IOException {
     this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 2633968..612bab5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.util.Progress;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
 @LimitedPrivate("mapreduce")
@@ -31,6 +32,10 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
 
   private final Progress progress = new Progress();
 
+  public ShuffledMergedInputLegacy(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
   @Private
   public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
     // wait for input so that iterator is available

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index e2d73e6..ff076ca 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -38,6 +38,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -68,7 +69,8 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
   private SimpleFetchedInputAllocator inputManager;
   private ShuffleEventHandler inputEventHandler;
 
-  public ShuffledUnorderedKVInput() {
+  public ShuffledUnorderedKVInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
index 66273d2..197664e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -48,6 +49,10 @@ public class SortedGroupedMergedInput extends MergedLogicalInput {
   private final Set<Input> completedInputs = Collections
       .newSetFromMap(new IdentityHashMap<Input, Boolean>());
 
+  public SortedGroupedMergedInput(TezMergedInputContext context, List<Input> inputs) {
+    super(context, inputs);
+  }
+
   @Override
   public KeyValuesReader getReader() throws Exception {
     return new SortedGroupedMergedKeyValuesReader(getInputs());

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index d1c5fc0..550448e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -26,12 +26,16 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
 public class LocalOnFileSorterOutput extends OnFileSortedOutput {
 
   private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
 
+  public LocalOnFileSorterOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
 
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index f10cb20..8536746 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -43,9 +43,9 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -74,6 +74,10 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
   private boolean sendEmptyPartitionDetails;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+  public OnFileSortedOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
   @Override
   public synchronized List<Event> initialize() throws IOException {
     this.startTime = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index e9956e4..32592bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -47,7 +48,6 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
@@ -62,9 +62,11 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
   private boolean dataViaEventsEnabled;
   private int dataViaEventsMaxSize;
 
-  public OnFileUnorderedKVOutput() {
+  public OnFileUnorderedKVOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
   }
 
+
   @Override
   public synchronized List<Event> initialize()
       throws Exception {
@@ -153,12 +155,6 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
     return events;
   }
 
-  @Override
-  public synchronized void setNumPhysicalOutputs(int numOutputs) {
-    Preconditions.checkArgument(numOutputs == 1,
-        "Number of outputs can only be 1 for " + this.getClass().getName());
-  }
-  
   @VisibleForTesting
   @Private
   String getHost() {

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
index f666924..75d6e00 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
@@ -51,37 +51,33 @@ public class OnFileUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileUnorderedPartitionedKVOutput.class);
 
-  private TezOutputContext outputContext;
   private Configuration conf;
-  private int numPhysicalOutputs;
   private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private UnorderedPartitionedKVWriter kvWriter;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+  public OnFileUnorderedPartitionedKVOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
   @Override
-  public synchronized List<Event> initialize(TezOutputContext outputContext) throws Exception {
-    this.outputContext = outputContext;
-    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
-    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
+  public synchronized List<Event> initialize() throws Exception {
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
     this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
-        this.numPhysicalOutputs);
+        getNumPhysicalOutputs());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-    outputContext.requestInitialMemory(
+    getContext().requestInitialMemory(
         UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf,
-            outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
+            getContext().getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
     return Collections.emptyList();
   }
 
   @Override
-  public List<Event> initialize() throws Exception {
-    return null;
-  }
-
-  @Override
   public synchronized void start() throws Exception {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();
-      this.kvWriter = new UnorderedPartitionedKVWriter(outputContext, conf, numPhysicalOutputs,
+      this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, getNumPhysicalOutputs(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
       isStarted.set(true);
     }
@@ -106,11 +102,6 @@ public class OnFileUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     }
   }
 
-  @Override
-  public synchronized void setNumPhysicalOutputs(int numOutputs) {
-    this.numPhysicalOutputs = numOutputs;
-  }
-
   private static final Set<String> confKeys = new HashSet<String>();
 
   static {

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 65e63d6..09b7d4b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -78,8 +78,8 @@ public class TestInputReadyVertexManager {
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
     
-    InputReadyVertexManager manager = new InputReadyVertexManager();
-    manager.initialize(mockContext);
+    InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
     manager.onVertexStarted(initialCompletions);
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
     verify(mockContext, times(0)).scheduleVertexTasks(anyList());
@@ -121,8 +121,8 @@ public class TestInputReadyVertexManager {
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
     
-    InputReadyVertexManager manager = new InputReadyVertexManager();
-    manager.initialize(mockContext);
+    InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
     manager.onVertexStarted(initialCompletions);
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
@@ -196,9 +196,9 @@ public class TestInputReadyVertexManager {
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     
     // 1-1 sources do not match managed tasks
-    InputReadyVertexManager manager = new InputReadyVertexManager();
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-    manager.initialize(mockContext);
+    InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
@@ -207,10 +207,10 @@ public class TestInputReadyVertexManager {
     }
     
     // 1-1 sources do not match
-    manager = new InputReadyVertexManager();
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
-    manager.initialize(mockContext);
+    manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
@@ -221,8 +221,8 @@ public class TestInputReadyVertexManager {
     initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
     initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
-    manager = new InputReadyVertexManager();
-    manager.initialize(mockContext);
+    manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 0);

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 5c3a83a..caed74a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -125,10 +125,10 @@ public class TestShuffleVertexManager {
           newEdgeManagers.clear();
           for (Entry<String, EdgeManagerDescriptor> entry :
               ((Map<String, EdgeManagerDescriptor>)invocation.getArguments()[2]).entrySet()) {
-            EdgeManager edgeManager = ReflectionUtils.createClazzInstance(
-                entry.getValue().getClassName());
+
+
             final byte[] userPayload = entry.getValue().getUserPayload();
-            edgeManager.initialize(new EdgeManagerContext() {
+            EdgeManagerContext emContext = new EdgeManagerContext() {
               @Override
               public byte[] getUserPayload() {
                 return userPayload;
@@ -153,7 +153,11 @@ public class TestShuffleVertexManager {
               public int getDestinationVertexNumTasks() {
                 return 0;
               }
-            });
+            };
+            EdgeManager edgeManager = ReflectionUtils
+                .createClazzInstance(entry.getValue().getClassName(),
+                    new Class[]{EdgeManagerContext.class}, new Object[]{emContext});
+            edgeManager.initialize();
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
           return null;
@@ -486,8 +490,7 @@ public class TestShuffleVertexManager {
   private ShuffleVertexManager createManager(Configuration conf, 
       VertexManagerPluginContext context, float min, float max) {
     conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
-    conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);    
-    ShuffleVertexManager manager = new ShuffleVertexManager();
+    conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
     byte[] payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);
@@ -495,7 +498,8 @@ public class TestShuffleVertexManager {
       throw new RuntimeException(e);
     }
     when(context.getUserPayload()).thenReturn(payload);
-    manager.initialize(context);
+    ShuffleVertexManager manager = new ShuffleVertexManager(context);
+    manager.initialize();
     return manager;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index d8ebcb6..890b342 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -24,15 +24,17 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.RawComparator;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.TezMergedInputContext;
 import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -40,14 +42,13 @@ import org.junit.Test;
 
 public class TestSortedGroupedMergedInput {
 
-  TezMergedInputContext createMergedInputContext(Input input) {
-    return new TezMergedInputContextImpl(null, input, mock(InputReadyTracker.class), null);
+  TezMergedInputContext createMergedInputContext() {
+    return new TezMergedInputContextImpl(null, "mergedInputName", new HashMap<String, MergedLogicalInput>(),
+        mock(InputReadyTracker.class), null);
   }
   
   @Test
   public void testSimple() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
-
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
         new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
 
@@ -65,8 +66,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput1);
     sInputs.add(sInput2);
     sInputs.add(sInput3);
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
 
-    input.initialize(sInputs, createMergedInputContext(input));
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -86,7 +87,7 @@ public class TestSortedGroupedMergedInput {
 
   @Test
   public void testSkippedKey() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
+
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
         new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
@@ -106,7 +107,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
+
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -129,7 +131,6 @@ public class TestSortedGroupedMergedInput {
 
   @Test
   public void testPartialValuesSkip() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
         new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
@@ -149,7 +150,7 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -176,7 +177,6 @@ public class TestSortedGroupedMergedInput {
 
   @Test
   public void testOrdering() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 2, 4 },
         new int[][] { { 2, 2 }, { 4, 4 } });
@@ -196,7 +196,7 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -228,7 +228,6 @@ public class TestSortedGroupedMergedInput {
 
   @Test
   public void testSkippedKey2() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 2, 4 },
         new int[][] { { 2, 2 }, { 4, 4 } });
@@ -248,7 +247,7 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -284,7 +283,6 @@ public class TestSortedGroupedMergedInput {
   // Reads all values for a key, but doesn't trigger the last hasNext() call.
   @Test
   public void testSkippedKey3() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3, 4 },
         new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 }, {4, 4} });
@@ -304,7 +302,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
+
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
     while (kvsReader.next()) {
@@ -330,7 +329,6 @@ public class TestSortedGroupedMergedInput {
 
   @Test
   public void testEmptySources() throws Exception {
-    SortedGroupedMergedInput input = new SortedGroupedMergedInput();
 
     SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] {},
         new int[][] {});
@@ -350,7 +348,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    input.initialize(sInputs, createMergedInputContext(input));
+    SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs);
+
     KeyValuesReader kvsReader = input.getReader();
     assertFalse(kvsReader.next());
   }
@@ -360,12 +359,12 @@ public class TestSortedGroupedMergedInput {
     final SortedTestKeyValuesReader reader;
 
     SortedTestInput(SortedTestKeyValuesReader reader) {
+      super(null, 0);
       this.reader = reader;
     }
 
     @Override
-    public List<Event> initialize(TezInputContext inputContext) throws Exception {
-      inputContext.inputIsReady();
+    public List<Event> initialize() throws IOException {
       return null;
     }
 
@@ -387,10 +386,6 @@ public class TestSortedGroupedMergedInput {
       return null;
     }
 
-    @Override
-    public void setNumPhysicalInputs(int numInputs) {
-    }
-
     @SuppressWarnings("rawtypes")
     public RawComparator getInputKeyComparator() {
       return new RawComparatorForTest();

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 3eb92c9..51c6a25 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.runtime.library.output;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,23 +63,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 
 @RunWith(Parameterized.class)
 public class TestOnFileSortedOutput {
@@ -147,10 +148,9 @@ public class TestOnFileSortedOutput {
   }
 
   private void startSortedOutput(int partitions) throws Exception {
-    sortedOutput = new OnFileSortedOutput();
-    sortedOutput.setNumPhysicalOutputs(partitions);
     TezOutputContext context = createTezOutputContext();
-    sortedOutput.initialize(context);
+    sortedOutput = new OnFileSortedOutput(context, partitions);
+    sortedOutput.initialize();
     sortedOutput.start();
     writer = sortedOutput.getWriter();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index fbe3b03..ac91e44 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -91,9 +91,6 @@ public class TestOnFileUnorderedKVOutput {
 
   @Test
   public void testGeneratedDataMovementEvent() throws Exception {
-
-    OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest();
-
     Configuration conf = new Configuration();
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
@@ -127,9 +124,11 @@ public class TestOnFileUnorderedKVOutput {
         taskAttemptID, counters, 0, userPayload, runtimeTask,
         null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null);
 
+    OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1);
+
     List<Event> events = null;
 
-    events = kvOutput.initialize(outputContext);
+    events = kvOutput.initialize();
     assertTrue(events != null && events.size() == 0);
 
     KeyValueWriter kvWriter = kvOutput.getWriter();
@@ -154,6 +153,11 @@ public class TestOnFileUnorderedKVOutput {
   }
 
   private static class OnFileUnorderedKVOutputForTest extends OnFileUnorderedKVOutput {
+
+    public OnFileUnorderedKVOutputForTest(TezOutputContext outputContext, int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
+    }
+
     @Override
     String getHost() {
       return "host";

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 67fc6a5..b0d5061 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -667,9 +667,16 @@ public class TestMRRJobsDAGApi {
   // the path it writes to is not dynamic.
   private static String RELOCALIZATION_TEST_CLASS_NAME = "AMClassloadTestDummyClass";
   public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator {
-    public List<Event> initialize(TezRootInputInitializerContext rootInputContext)  throws Exception {
+
+    public MRInputAMSplitGeneratorRelocalizationTest(
+        TezRootInputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
+    @Override
+    public List<Event> initialize()  throws Exception {
       MRInputUserPayloadProto userPayloadProto = MRHelpers
-          .parseMRInputPayload(rootInputContext.getInputUserPayload());
+          .parseMRInputPayload(getContext().getInputUserPayload());
       Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
           .getConfigurationBytes());
 
@@ -682,7 +689,7 @@ public class TestMRRJobsDAGApi {
         LOG.info("Class not found");
       }
 
-      return super.initialize(rootInputContext);
+      return super.initialize();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index e8c8ed1..51d24bc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -32,6 +32,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -108,7 +109,17 @@ public class TestInput extends AbstractLogicalInput {
    */
   public static String TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT =
       "tez.failing-input.failing-task-attempt";
-  
+
+  public TestInput(TezInputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+    this.completedInputVersion = new int[numPhysicalInputs];
+    this.inputValues = new int[numPhysicalInputs];
+    for (int i=0; i<numPhysicalInputs; ++i) {
+      this.completedInputVersion[i] = -1;
+      this.inputValues[i] = -1;
+    }
+  }
+
   public static InputDescriptor getInputDesc(byte[] payload) {
     return new InputDescriptor(TestInput.class.getName()).
         setUserPayload(payload);
@@ -140,7 +151,7 @@ public class TestInput extends AbstractLogicalInput {
              (lastInputReadyValue <= failingInputUpto)) {
           List<Event> events = Lists.newLinkedList();
           if (failingInputIndices.contains(failAll)) {
-            for (int i=0; i<numPhysicalInputs; ++i) {
+            for (int i=0; i<getNumPhysicalInputs(); ++i) {
               String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
                   " index: " + i + " version: " + lastInputReadyValue);
               events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue));
@@ -148,9 +159,9 @@ public class TestInput extends AbstractLogicalInput {
             }
           } else {
             for (Integer index : failingInputIndices) {
-              if (index.intValue() >= numPhysicalInputs) {
+              if (index.intValue() >= getNumPhysicalInputs()) {
                 throwException("InputIndex: " + index.intValue() + 
-                    " should be less than numInputs: " + numPhysicalInputs);
+                    " should be less than numInputs: " + getNumPhysicalInputs());
               }
               if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
                 continue; // dont fail a previous version now.
@@ -196,7 +207,7 @@ public class TestInput extends AbstractLogicalInput {
     
     // sum input value given by upstream tasks
     int sum = 0;
-    for (int i=0; i<numPhysicalInputs; ++i) {
+    for (int i=0; i<getNumPhysicalInputs(); ++i) {
       if (inputValues[i] == -1) {
         throwException("Invalid input value : " + i);
       }
@@ -215,7 +226,7 @@ public class TestInput extends AbstractLogicalInput {
   public static String getVertexConfName(String confName, String vertexName) {
     return confName + "." + vertexName;
   }
-  
+
   @Override
   public List<Event> initialize() throws Exception {
     getContext().requestInitialMemory(0l, null); //Mandatory call.
@@ -272,7 +283,7 @@ public class TestInput extends AbstractLogicalInput {
         LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + 
             " targetId: " + dmEvent.getTargetIndex() +
             " version: " + dmEvent.getVersion() +
-            " numInputs: " + numPhysicalInputs +
+            " numInputs: " + getNumPhysicalInputs() +
             " numCompletedInputs: " + numCompletedInputs);
         this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
         this.inputValues[dmEvent.getTargetIndex()] = 
@@ -282,13 +293,13 @@ public class TestInput extends AbstractLogicalInput {
         numCompletedInputs--;
         LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
-            " numInputs: " + numPhysicalInputs +
+            " numInputs: " + getNumPhysicalInputs() +
             " numCompletedInputs: " + numCompletedInputs);
       }
     }
-    if (numCompletedInputs == numPhysicalInputs) {
+    if (numCompletedInputs == getNumPhysicalInputs()) {
       int maxInputVersionSeen = -1;  
-      for (int i=0; i<numPhysicalInputs; ++i) {
+      for (int i=0; i<getNumPhysicalInputs(); ++i) {
         if (completedInputVersion[i] < 0) {
           LOG.info("Not received completion for input " + i);
           return;
@@ -310,14 +321,4 @@ public class TestInput extends AbstractLogicalInput {
     return null;
   }
 
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    this.numPhysicalInputs = numInputs;
-    this.completedInputVersion = new int[numInputs];
-    this.inputValues = new int[numInputs];
-    for (int i=0; i<numInputs; ++i) {
-      this.completedInputVersion[i] = -1;
-      this.inputValues[i] = -1;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 0ab1070..064119f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 
@@ -34,7 +35,11 @@ import com.google.common.collect.Lists;
 
 public class TestOutput extends AbstractLogicalOutput {
   private static final Log LOG = LogFactory.getLog(TestOutput.class);
-  
+
+  public TestOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
   public static OutputDescriptor getOutputDesc(byte[] payload) {
     return new OutputDescriptor(TestOutput.class.getName()).
         setUserPayload(payload);

http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index a6768a7..76e2bc6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -45,6 +45,8 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
@@ -76,16 +78,18 @@ public class MultiAttemptDAG {
   public static class FailOnAttemptVertexManagerPlugin extends VertexManagerPlugin {
     private int numSourceTasks = 0;
     private AtomicInteger numCompletions = new AtomicInteger();
-    private VertexManagerPluginContext context;
     private boolean tasksScheduled = false;
 
+    public FailOnAttemptVertexManagerPlugin(VertexManagerPluginContext context) {
+      super(context);
+    }
+
     @Override
-    public void initialize(VertexManagerPluginContext context) {
-      this.context = context;
+    public void initialize() {
       for (String input :
-          context.getInputVertexEdgeProperties().keySet()) {
+          getContext().getInputVertexEdgeProperties().keySet()) {
         LOG.info("Adding sourceTasks for Vertex " + input);
-        numSourceTasks += context.getVertexNumTasks(input);
+        numSourceTasks += getContext().getVertexNumTasks(input);
         LOG.info("Current numSourceTasks=" + numSourceTasks);
       }
     }
@@ -107,21 +111,21 @@ public class MultiAttemptDAG {
       if (numCompletions.get() >= numSourceTasks
           && !tasksScheduled) {
         tasksScheduled = true;
-        String payload = new String(context.getUserPayload());
+        String payload = new String(getContext().getUserPayload());
         int successAttemptId = Integer.valueOf(payload);
         LOG.info("Checking whether to crash AM or schedule tasks"
             + ", successfulAttemptID=" + successAttemptId
-            + ", currentAttempt=" + context.getDAGAttemptNumber());
-        if (successAttemptId > context.getDAGAttemptNumber()) {
+            + ", currentAttempt=" + getContext().getDAGAttemptNumber());
+        if (successAttemptId > getContext().getDAGAttemptNumber()) {
           Runtime.getRuntime().halt(-1);
-        } else if (successAttemptId == context.getDAGAttemptNumber()) {
-          LOG.info("Scheduling tasks for vertex=" + context.getVertexName());
-          int numTasks = context.getVertexNumTasks(context.getVertexName());
+        } else if (successAttemptId == getContext().getDAGAttemptNumber()) {
+          LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName());
+          int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
           List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
           for (int i=0; i<numTasks; ++i) {
             scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
           }
-          context.scheduleVertexTasks(scheduledTasks);
+          getContext().scheduleVertexTasks(scheduledTasks);
         }
       }
     }
@@ -150,11 +154,15 @@ public class MultiAttemptDAG {
 
     boolean failOnCommit = false;
 
+    public FailingOutputCommitter(OutputCommitterContext committerContext) {
+      super(committerContext);
+    }
+
     @Override
-    public void initialize(OutputCommitterContext context) throws Exception {
+    public void initialize() throws Exception {
       FailingOutputCommitterConfig config = new
           FailingOutputCommitterConfig();
-      config.fromUserPayload(context.getOutputUserPayload());
+      config.fromUserPayload(getContext().getOutputUserPayload());
       failOnCommit = config.failOnCommit;
     }
 
@@ -204,14 +212,18 @@ public class MultiAttemptDAG {
 
   public static class FailingInputInitializer extends TezRootInputInitializer {
 
+    public FailingInputInitializer(TezRootInputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
     @Override
-    public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws Exception {
+    public List<Event> initialize() throws Exception {
       try {
         Thread.sleep(2000l);
       } catch (InterruptedException e) {
         // Ignore
       }
-      if (inputVertexContext.getDAGAttemptNumber() == 1) {
+      if (getContext().getDAGAttemptNumber() == 1) {
         LOG.info("Shutting down the AM in 1st attempt");
         Runtime.getRuntime().halt(-1);
       }
@@ -227,9 +239,13 @@ public class MultiAttemptDAG {
 
   public static class NoOpInput extends AbstractLogicalInput {
 
+    public NoOpInput(TezInputContext inputContext, int numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
+    }
+
     @Override
     public List<Event> initialize() throws Exception {
-      inputContext.requestInitialMemory(1l, new MemoryUpdateCallback() {
+      getContext().requestInitialMemory(1l, new MemoryUpdateCallback() {
         @Override
         public void memoryAssigned(long assignedSize) {}
       });
@@ -259,11 +275,17 @@ public class MultiAttemptDAG {
 
   public static class NoOpOutput extends AbstractLogicalOutput {
 
+    public NoOpOutput(TezOutputContext outputContext,
+                      int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
+    }
+
     @Override
     public List<Event> initialize() throws Exception {
-      outputContext.requestInitialMemory(1l, new MemoryUpdateCallback() {
+      getContext().requestInitialMemory(1l, new MemoryUpdateCallback() {
         @Override
-        public void memoryAssigned(long assignedSize) {}
+        public void memoryAssigned(long assignedSize) {
+        }
       });
       return null;
     }


Mime
View raw message