tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-695. Create Abstract class for Input/Processor/Output (Mohammad Kamrul Islam via bikas)
Date Tue, 15 Apr 2014 23:18:59 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 0c324736b -> 7083f0119


TEZ-695. Create Abstract class for Input/Processor/Output (Mohammad Kamrul Islam via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7083f011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7083f011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7083f011

Branch: refs/heads/master
Commit: 7083f0119e48de342fad8ac1173ae18366ddd801
Parents: 0c32473
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 15 16:18:58 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 15 16:18:58 2014 -0700

----------------------------------------------------------------------
 .../runtime/api/AbstractLogicalIOProcessor.java | 45 +++++++++++++++
 .../tez/runtime/api/AbstractLogicalInput.java   | 53 +++++++++++++++++
 .../tez/runtime/api/AbstractLogicalOutput.java  | 53 +++++++++++++++++
 .../tez/mapreduce/examples/UnionExample.java    |  3 -
 .../tez/mapreduce/examples/WordCount.java       | 61 +++++++++-----------
 .../org/apache/tez/mapreduce/input/MRInput.java | 50 +++++++---------
 .../apache/tez/mapreduce/output/MROutput.java   | 39 +++++--------
 .../tez/runtime/TestInputReadyTracker.java      | 21 ++-----
 .../TestLogicalIOProcessorRuntimeTask.java      | 55 +++++++-----------
 .../runtime/library/input/LocalMergedInput.java | 16 +++--
 .../library/input/ShuffledMergedInput.java      | 49 +++++++---------
 .../input/ShuffledMergedInputLegacy.java        |  2 +-
 .../library/input/ShuffledUnorderedKVInput.java | 50 +++++++---------
 .../library/output/LocalOnFileSorterOutput.java |  2 +-
 .../library/output/OnFileSortedOutput.java      | 46 ++++++---------
 .../library/output/OnFileUnorderedKVOutput.java | 21 +++----
 .../java/org/apache/tez/test/TestInput.java     | 56 +++++++++---------
 .../java/org/apache/tez/test/TestOutput.java    | 22 ++-----
 18 files changed, 350 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
new file mode 100644
index 0000000..27e1bc8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+/**
+ * Abstract representation of the interface {@link LogicalIOProcessor}.
+ * Implements the base logic of some methods into this class.
+ * It will reduce the code for any processor implementation.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor {
+  protected TezProcessorContext context;
+
+  @Override
+  public void initialize(TezProcessorContext processorContext) throws Exception {
+    this.context = processorContext;
+    initialize();
+  }
+
+  public abstract void initialize() throws Exception;
+
+  public TezProcessorContext getContext() {
+    return context;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
new file mode 100644
index 0000000..e079587
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * The abstract implementation of {@link LogicalInput}. It includes default
+ * implementations of few methods for the convenience.
+ * 
+ */
+public abstract class AbstractLogicalInput implements LogicalInput {
+
+  protected int numPhysicalInputs;
+  protected TezInputContext inputContext;
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    this.numPhysicalInputs = numInputs;
+  }
+
+  @Override
+  public List<Event> initialize(TezInputContext _inputContext) throws Exception {
+    this.inputContext = _inputContext;
+    return initialize();
+  }
+
+  public abstract List<Event> initialize() throws Exception;
+
+  public int getNumPhysicalInputs() {
+    return numPhysicalInputs;
+  }
+
+  public TezInputContext getContext() {
+    return inputContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
new file mode 100644
index 0000000..abdf2ae
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * The abstract implementation of {@link LogicalOutput}. It includes default
+ * implementations of few methods for the convenience.
+ * 
+ */
+public abstract class AbstractLogicalOutput implements LogicalOutput {
+
+  protected int numPhysicalOutputs;
+  protected TezOutputContext outputContext;
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numPhysicalOutputs = numOutputs;
+  }
+
+  @Override
+  public List<Event> initialize(TezOutputContext _outputContext) throws Exception {
+    this.outputContext = _outputContext;
+    return initialize();
+  }
+  
+  public abstract List<Event> initialize() throws Exception;
+
+  public int getNumPhysicalOutputs() {
+    return numPhysicalOutputs;
+  }
+
+  public TezOutputContext getContext() {
+    return outputContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 14a9648..d73bbe1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -22,7 +22,6 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -49,7 +48,6 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
-import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.DAG;
@@ -59,7 +57,6 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 20d9229..7dade9b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,11 +18,9 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
 
@@ -49,7 +47,6 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
-import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -57,14 +54,12 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -72,11 +67,10 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -87,26 +81,11 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import com.google.common.base.Preconditions;
 
 public class WordCount {
-  public static class TokenProcessor implements LogicalIOProcessor {
-    TezProcessorContext context;
+  public static class TokenProcessor extends AbstractLogicalIOProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
-        throws Exception {
-      this.context = processorContext;
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-    }
-
-    @Override
-    public void close() throws Exception {
-    }
-
-    @Override
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
       for (LogicalInput input : inputs.values()) {
@@ -129,26 +108,25 @@ public class WordCount {
         }
       }
     }
-    
-  }
-  
-  public static class SumProcessor implements LogicalIOProcessor {
-    TezProcessorContext context;
-    
+
     @Override
-    public void initialize(TezProcessorContext processorContext)
-        throws Exception {
-      this.context = processorContext;
+    public void initialize() throws Exception {
+
     }
 
     @Override
     public void handleEvents(List<Event> processorEvents) {
+      
     }
 
     @Override
     public void close() throws Exception {
+      
     }
 
+  }
+  
+  public static class SumProcessor extends AbstractLogicalIOProcessor {
     @Override
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
@@ -173,13 +151,28 @@ public class WordCount {
         kvWriter.write(word, new IntWritable(sum));
       }
       if (out.isCommitRequired()) {
-        while (!context.canCommit()) {
+        while (!getContext().canCommit()) {
           Thread.sleep(100);
         }
         out.commit();
       }
     }
-    
+
+    @Override
+    public void initialize() throws Exception {
+
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+      
+    }
+
+    @Override
+    public void close() throws Exception {
+      
+    }
+
   }
   
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 1ac0295..56d55ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -56,10 +56,9 @@ import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -75,13 +74,12 @@ import com.google.common.base.Preconditions;
  * This class is not meant to be extended by external projects.
  */
 
-public class MRInput implements LogicalInput {
+public class MRInput extends AbstractLogicalInput {
 
   private static final Log LOG = LogFactory.getLog(MRInput.class);
   
   private final Lock rrLock = new ReentrantLock();
   private Condition rrInited = rrLock.newCondition();
-  private TezInputContext inputContext;
   
   private volatile boolean eventReceived = false;
   
@@ -115,12 +113,11 @@ public class MRInput implements LogicalInput {
   
   
   @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    this.inputContext.requestInitialMemory(0l, null); //mandatory call
-    this.inputContext.inputIsReady();
+  public List<Event> initialize() throws IOException {
+    getContext().requestInitialMemory(0l, null); //mandatory call
+    getContext().inputIsReady();
     MRInputUserPayloadProto mrUserPayload =
-      MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
+      MRHelpers.parseMRInputPayload(getContext().getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
         "Split information not expected in MRInput");
     Configuration conf =
@@ -131,21 +128,21 @@ public class MRInput implements LogicalInput {
 
     TaskAttemptID taskAttemptId = new TaskAttemptID(
       new TaskID(
-        Long.toString(inputContext.getApplicationId().getClusterTimestamp()),
-        inputContext.getApplicationId().getId(), TaskType.MAP,
-        inputContext.getTaskIndex()),
-      inputContext.getTaskAttemptNumber());
+        Long.toString(getContext().getApplicationId().getClusterTimestamp()),
+        getContext().getApplicationId().getId(), TaskType.MAP,
+        getContext().getTaskIndex()),
+        getContext().getTaskAttemptNumber());
 
     jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
       taskAttemptId.toString());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-      inputContext.getDAGAttemptNumber());
+        getContext().getDAGAttemptNumber());
 
     // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
     // processor. (The processor could provide the counter though)
 
-    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+    this.inputRecordCounter = getContext().getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
 
     useNewApi = this.jobConf.getUseNewMapper();
     this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
@@ -156,7 +153,7 @@ public class MRInput implements LogicalInput {
     initializeInternal();
     return null;
   }
-  
+
   @Override
   public void start() {
   }
@@ -175,7 +172,7 @@ public class MRInput implements LogicalInput {
       } else {
         // Read split information.
         TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
-        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
+        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext()
             .getTaskIndex()];
         this.splitMetaInfo = new TaskSplitIndex(
             thisTaskMetaInfo.getSplitLocation(),
@@ -193,7 +190,7 @@ public class MRInput implements LogicalInput {
     } finally {
       rrLock.unlock();
     }
-    LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
+    LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName());
   }
 
   private void setupOldInputFormat() {
@@ -203,7 +200,7 @@ public class MRInput implements LogicalInput {
   private void setupOldRecordReader() throws IOException {
     Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
     oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-        this.jobConf, new MRReporter(inputContext, oldInputSplit));
+        this.jobConf, new MRReporter(getContext(), oldInputSplit));
     setIncrementalConfigParams(oldInputSplit);
   }
   
@@ -244,7 +241,7 @@ public class MRInput implements LogicalInput {
     }
 
     LOG.info("Creating reader for MRInput: "
-        + inputContext.getSourceVertexName());
+        + getContext().getSourceVertexName());
     return new MRInputKVReader();
   }
 
@@ -265,13 +262,6 @@ public class MRInput implements LogicalInput {
     processSplitEvent((RootInputDataInformationEvent)event);
   }
 
-  
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    // Not required at the moment. May be required if splits are sent via events.
-  }
-
   @Override
   public List<Event> close() throws IOException {
     if (useNewApi) {
@@ -308,7 +298,7 @@ public class MRInput implements LogicalInput {
 
   
   private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, inputContext, true, null);
+    return new TaskAttemptContextImpl(this.jobConf, getContext(), true, null);
   }
   
   void processSplitEvent(RootInputDataInformationEvent event)
@@ -401,7 +391,7 @@ public class MRInput implements LogicalInput {
     deserializer.open(inFile);
     org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
     long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
         .increment(pos - offset);
     inFile.close();
     return split;
@@ -447,7 +437,7 @@ public class MRInput implements LogicalInput {
     org.apache.hadoop.mapreduce.InputSplit split = 
         deserializer.deserialize(null);
     long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
         .increment(pos - offset);
     inFile.close();
     return split;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 2ecf602..8aeee26 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -46,19 +46,17 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
-public class MROutput implements LogicalOutput {
+public class MROutput extends AbstractLogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(MROutput.class);
 
   private final NumberFormat taskNumberFormat = NumberFormat.getInstance();
   private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance();
   
-  private TezOutputContext outputContext;
   private JobConf jobConf;
   boolean useNewApi;
   private AtomicBoolean closed = new AtomicBoolean(false);
@@ -83,17 +81,15 @@ public class MROutput implements LogicalOutput {
   protected OutputCommitter committer;
 
   @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException, InterruptedException {
+  public List<Event> initialize() throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");
-    outputContext.requestInitialMemory(0l, null); //mandatory call
+    getContext().requestInitialMemory(0l, null); //mandatory call
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
     nonTaskNumberFormat.setMinimumIntegerDigits(3);
     nonTaskNumberFormat.setGroupingUsed(false);
-    this.outputContext = outputContext;
     Configuration conf = TezUtils.createConfFromUserPayload(
-        outputContext.getUserPayload());
+        getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
     // Add tokens to the jobConf - in case they are accessed within the RW / OF
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
@@ -101,9 +97,9 @@ public class MROutput implements LogicalOutput {
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        outputContext.getDAGAttemptNumber());
+        getContext().getDAGAttemptNumber());
     TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
-        .createMockTaskAttemptID(outputContext, isMapperOutput);
+        .createMockTaskAttemptID(getContext(), isMapperOutput);
     jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
     jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
     jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
@@ -118,7 +114,7 @@ public class MROutput implements LogicalOutput {
       }
     }
 
-    outputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
+    outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
 
     if (useNewApi) {
       newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
@@ -140,7 +136,7 @@ public class MROutput implements LogicalOutput {
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
               jobConf, taskAttemptId,
-              new MRTaskReporter(outputContext));
+              new MRTaskReporter(getContext()));
       oldOutputFormat = jobConf.getOutputFormat();
 
       FileSystem fs = FileSystem.get(jobConf);
@@ -148,7 +144,7 @@ public class MROutput implements LogicalOutput {
 
       oldRecordWriter =
           oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(outputContext));
+              fs, jobConf, finalName, new MRReporter(getContext()));
     }
     initCommitter(jobConf, useNewApi);
 
@@ -156,7 +152,7 @@ public class MROutput implements LogicalOutput {
         + ", using_new_api: " + useNewApi);
     return null;
   }
-  
+
   @Override
   public void start() {
   }
@@ -208,7 +204,7 @@ public class MROutput implements LogicalOutput {
   }
 
   private TaskAttemptContext createTaskAttemptContext(TaskAttemptID attemptId) {
-    return new TaskAttemptContextImpl(this.jobConf, attemptId, outputContext,
+    return new TaskAttemptContextImpl(this.jobConf, attemptId, getContext(),
         isMapperOutput, null);
   }
 
@@ -216,8 +212,8 @@ public class MROutput implements LogicalOutput {
     String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
     if (prefix == null) {
       prefix = "part-v" + 
-          nonTaskNumberFormat.format(outputContext.getTaskVertexIndex()) +  
-          "-o" + nonTaskNumberFormat.format(outputContext.getOutputIndex());
+          nonTaskNumberFormat.format(getContext().getTaskVertexIndex()) +  
+          "-o" + nonTaskNumberFormat.format(getContext().getOutputIndex());
     }
     return prefix;
   }
@@ -225,7 +221,7 @@ public class MROutput implements LogicalOutput {
   private String getOutputName() {
     // give a unique prefix to the output name
     return getOutputFileNamePrefix() + 
-        "-" + taskNumberFormat.format(outputContext.getTaskIndex());
+        "-" + taskNumberFormat.format(getContext().getTaskIndex());
   }
 
   @Override
@@ -276,11 +272,6 @@ public class MROutput implements LogicalOutput {
     return null;
   }
 
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    // Nothing to do for now
-  }
-
   /**
    * MROutput expects that a Processor call commit prior to the
    * Processor's completion

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index 6f5aa2f..cb9f76a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -26,12 +26,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -205,7 +204,7 @@ public class TestInputReadyTracker {
     return startTime;
   }
 
-  private static class ImmediatelyReadyInputForTest implements LogicalInput {
+  private static class ImmediatelyReadyInputForTest extends AbstractLogicalInput {
 
     private volatile boolean isReady = false;
     
@@ -215,7 +214,7 @@ public class TestInputReadyTracker {
     }
 
     @Override
-    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    public List<Event> initialize() throws Exception {
       return null;
     }
 
@@ -236,13 +235,9 @@ public class TestInputReadyTracker {
     public List<Event> close() throws Exception {
       return null;
     }
-
-    @Override
-    public void setNumPhysicalInputs(int numInputs) {
-    }
   }
   
-  private static class ControlledReadyInputForTest implements LogicalInput {
+  private static class ControlledReadyInputForTest extends AbstractLogicalInput {
 
     private volatile boolean isReady = false;
     private InputReadyTracker inputReadyTracker;
@@ -252,7 +247,7 @@ public class TestInputReadyTracker {
     }
 
     @Override
-    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    public List<Event> initialize() throws Exception {
       return null;
     }
 
@@ -274,11 +269,7 @@ public class TestInputReadyTracker {
       return null;
     }
 
-    @Override
-    public void setNumPhysicalInputs(int numInputs) {
-    }
-    
-    // Used by the test to control when this input will be ready
+   // Used by the test to control when this input will be ready
     public void setInputIsReady() {
       isReady = true;
       inputReadyTracker.setInputIsReady(this);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 27a250e..5eb25c8 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -34,14 +34,13 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -136,7 +135,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     return TezDAGID.getInstance("2000", 100, 1);
   }
 
-  public static class TestProcessor implements LogicalIOProcessor {
+  public static class TestProcessor extends AbstractLogicalIOProcessor {
 
     public static volatile int runCount = 0;
 
@@ -144,19 +143,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public void initialize(TezProcessorContext processorContext) throws Exception {
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void close() throws Exception {
-      // TODO Auto-generated method stub
-
+    public void initialize() throws Exception {
     }
 
     @Override
@@ -165,9 +152,19 @@ public class TestLogicalIOProcessorRuntimeTask {
       runCount++;
     }
 
+	@Override
+	public void handleEvents(List<Event> processorEvents) {
+		
+	}
+
+	@Override
+	public void close() throws Exception {
+		
+	}
+
   }
 
-  public static class TestInput implements LogicalInput {
+  public static class TestInput extends AbstractLogicalInput {
 
     public static volatile int startCount = 0;
 
@@ -175,9 +172,9 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public List<Event> initialize(TezInputContext inputContext) throws Exception {
-      inputContext.requestInitialMemory(0, null);
-      inputContext.inputIsReady();
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0, null);
+      getContext().inputIsReady();
       return null;
     }
 
@@ -201,13 +198,9 @@ public class TestLogicalIOProcessorRuntimeTask {
       return null;
     }
 
-    @Override
-    public void setNumPhysicalInputs(int numInputs) {
-    }
-
   }
 
-  public static class TestOutput implements LogicalOutput {
+  public static class TestOutput extends AbstractLogicalOutput {
 
     public static volatile int startCount = 0;
 
@@ -215,8 +208,8 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public List<Event> initialize(TezOutputContext outputContext) throws Exception {
-      outputContext.requestInitialMemory(0, null);
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0, null);
       return null;
     }
 
@@ -240,9 +233,5 @@ public class TestLogicalIOProcessorRuntimeTask {
       return null;
     }
 
-    @Override
-    public void setNumPhysicalOutputs(int numOutputs) {
-    }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index d298e8f..93ca93e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
 
 /**
@@ -34,17 +33,16 @@ import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
 public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    this.inputContext.requestInitialMemory(0l, null); // mandatory call.
-    this.inputContext.inputIsReady();
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+  public List<Event> initialize() throws IOException {
+    getContext().requestInitialMemory(0l, null); // mandatory call.
+    getContext().inputIsReady();
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
 
-    if (numInputs == 0) {
+    if (getNumPhysicalInputs() == 0) {
       return Collections.emptyList();
     }
 
-    LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
+    LocalShuffle localShuffle = new LocalShuffle(getContext(), conf, getNumPhysicalInputs());
     rawIter = localShuffle.run();
     createValuesIterator();
     return Collections.emptyList();
@@ -56,7 +54,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
   public List<Event> close() throws IOException {
-    if (numInputs != 0) {
+    if (getNumPhysicalInputs() != 0) {
       rawIter.close();
     }
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 8a6de52..0c01f92 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -33,9 +33,8 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -47,7 +46,7 @@ import com.google.common.base.Preconditions;
 
 
 /**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * <code>ShuffleMergedInput</code> in a {@link AbstractLogicalInput} which shuffles
  * intermediate sorted data, merges them and provides key/<values> to the
  * consumer.
  *
@@ -57,14 +56,12 @@ import com.google.common.base.Preconditions;
  * completion. Attempting to get a reader on a non-complete input will block.
  *
  */
-public class ShuffledMergedInput implements LogicalInput {
+public class ShuffledMergedInput extends AbstractLogicalInput {
 
   static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
 
-  protected TezInputContext inputContext;
   protected TezRawKeyValueIterator rawIter = null;
   protected Configuration conf;
-  protected int numInputs = 0;
   protected Shuffle shuffle;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
@@ -78,28 +75,27 @@ public class ShuffledMergedInput implements LogicalInput {
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public synchronized List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+  public synchronized List<Event> initialize() throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
 
-    if (this.numInputs == 0) {
-      inputContext.requestInitialMemory(0l, null);
+    if (this.getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0l, null);
       isStarted.set(true);
-      inputContext.inputIsReady();
+      getContext().inputIsReady();
       LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
-          + inputContext.getSourceVertexName());
+          + getContext().getSourceVertexName());
       return Collections.emptyList();
     }
     
     long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
-        inputContext.getTotalMemoryAvailableToTask());
+        getContext().getTotalMemoryAvailableToTask());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-    inputContext.requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
+    getContext().requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
 
-    this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-    this.inputValueCounter = inputContext.getCounters().findCounter(
+    this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = getContext().getCounters().findCounter(
         TaskCounter.REDUCE_INPUT_RECORDS);
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
 
     return Collections.emptyList();
   }
@@ -109,7 +105,7 @@ public class ShuffledMergedInput implements LogicalInput {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();
       // Start the shuffle - copy and merge
-      shuffle = new Shuffle(inputContext, conf, numInputs, memoryUpdateCallbackHandler.getMemoryAssigned());
+      shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
       shuffle.run();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
@@ -134,7 +130,7 @@ public class ShuffledMergedInput implements LogicalInput {
    */
   public synchronized boolean isInputReady() {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-    if (this.numInputs == 0) {
+    if (getNumPhysicalInputs() == 0) {
       return true;
     }
     return shuffle.isInputReady();
@@ -150,7 +146,7 @@ public class ShuffledMergedInput implements LogicalInput {
     Shuffle localShuffleCopy = null;
     synchronized (this) {
       Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-      if (this.numInputs == 0) {
+      if (getNumPhysicalInputs() == 0) {
         return;
       }
       localShuffleCopy = shuffle;
@@ -165,7 +161,7 @@ public class ShuffledMergedInput implements LogicalInput {
 
   @Override
   public synchronized List<Event> close() throws IOException {
-    if (this.numInputs != 0 && rawIter != null) {
+    if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
       rawIter.close();
     }
     return Collections.emptyList();
@@ -189,7 +185,7 @@ public class ShuffledMergedInput implements LogicalInput {
     TezRawKeyValueIterator rawIterLocal;
     synchronized (this) {
       rawIterLocal = rawIter;
-      if (this.numInputs == 0) {
+      if (getNumPhysicalInputs() == 0) {
         return new KeyValuesReader() {
           @Override
           public boolean next() throws IOException {
@@ -227,7 +223,7 @@ public class ShuffledMergedInput implements LogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) {
     synchronized (this) {
-      if (numInputs == 0) {
+      if (getNumPhysicalInputs() == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
       }
       if (!isStarted.get()) {
@@ -241,11 +237,6 @@ public class ShuffledMergedInput implements LogicalInput {
     shuffle.handleEvents(inputEvents);
   }
 
-  @Override
-  public synchronized void setNumPhysicalInputs(int numInputs) {
-    this.numInputs = numInputs;
-  }
-
   @SuppressWarnings({ "rawtypes", "unchecked" })
   protected synchronized void createValuesIterator()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 9d89eec..2633968 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -35,7 +35,7 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
   public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
     // wait for input so that iterator is available
     synchronized(this) {
-    if (this.numInputs == 0) {
+    if (getNumPhysicalInputs() == 0) {
       return new TezRawKeyValueIterator() {
         @Override
         public DataInputBuffer getKey() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 61c870f..16e5d68 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -1,5 +1,5 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
+git diff * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
@@ -36,9 +36,8 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -49,13 +48,12 @@ import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
 
 import com.google.common.base.Preconditions;
-public class ShuffledUnorderedKVInput implements LogicalInput {
+
+public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
   
   private Configuration conf;
-  private int numInputs = -1;
-  private TezInputContext inputContext;
   private ShuffleManager shuffleManager;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
   private long firstEventReceivedTime = -1;
@@ -73,26 +71,25 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   }
 
   @Override
-  public synchronized List<Event> initialize(TezInputContext inputContext) throws Exception {
-    Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
-    this.inputContext = inputContext;
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+  public synchronized List<Event> initialize() throws Exception {
+    Preconditions.checkArgument(getNumPhysicalInputs() != -1, "Number of Inputs has not been set");
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
 
-    if (numInputs == 0) {
-      inputContext.requestInitialMemory(0l, null);
+    if (getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0l, null);
       isStarted.set(true);
-      inputContext.inputIsReady();
+      getContext().inputIsReady();
       LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
-          + inputContext.getSourceVertexName());
+          + getContext().getSourceVertexName());
       return Collections.emptyList();
     } else {
       long initalMemReq = getInitialMemoryReq();
       memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-      this.inputContext.requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
+      this.getContext().requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
     }
 
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-    this.inputRecordCounter = inputContext.getCounters().findCounter(
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
+    this.inputRecordCounter = getContext().getCounters().findCounter(
         TaskCounter.INPUT_RECORDS_PROCESSED);
     return Collections.emptyList();
   }
@@ -123,14 +120,14 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
       ifileBufferSize = conf.getInt("io.file.buffer.size",
           TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
-      this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
-          inputContext.getTotalMemoryAvailableToTask(),
+      this.inputManager = new SimpleFetchedInputAllocator(getContext().getUniqueIdentifier(), conf,
+          getContext().getTotalMemoryAvailableToTask(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
 
-      this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs, ifileBufferSize,
+      this.shuffleManager = new ShuffleManager(getContext(), conf, getNumPhysicalInputs(), ifileBufferSize,
           ifileReadAhead, ifileReadAheadLength, codec, inputManager);
 
-      this.inputEventHandler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager,
+      this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager,
           inputManager, codec, ifileReadAhead, ifileReadAheadLength);
 
       ////// End of Initial configuration
@@ -152,7 +149,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   @Override
   public synchronized KeyValueReader getReader() throws Exception {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-    if (numInputs == 0) {
+    if (getNumPhysicalInputs() == 0) {
       return new KeyValueReader() {
         @Override
         public boolean next() throws IOException {
@@ -176,7 +173,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
     synchronized (this) {
-      if (numInputs == 0) {
+      if (getNumPhysicalInputs() == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
       }
       if (!isStarted.get()) {
@@ -201,14 +198,9 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     return null;
   }
 
-  @Override
-  public synchronized void setNumPhysicalInputs(int numInputs) {
-    this.numInputs = numInputs;
-  }
-
   private long getInitialMemoryReq() {
     return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
-        inputContext.getTotalMemoryAvailableToTask());
+        getContext().getTotalMemoryAvailableToTask());
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index d7e017a..d1c5fc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -45,7 +45,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
     Path src = mapOutputFile.getOutputFile();
     Path dst =
         mapOutputFile.getInputFileForWrite(
-            outputContext.getTaskIndex(),
+            getContext().getTaskIndex(),
             localFs.getFileStatus(src).getLen());
 
     LOG.info("Renaming src = " + src + ", dst = " + dst);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index c8f2b22..9cf03ff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -32,9 +32,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -53,17 +52,15 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 /**
- * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
+ * <code>OnFileSortedOutput</code> is an {@link AbstractLogicalOutput} which sorts key/value pairs 
  * written to it and persists it to a file.
  */
-public class OnFileSortedOutput implements LogicalOutput {
+public class OnFileSortedOutput extends AbstractLogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileSortedOutput.class);
 
   protected ExternalSorter sorter;
   protected Configuration conf;
-  protected int numOutputs;
-  protected TezOutputContext outputContext;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private long startTime;
   private long endTime;
@@ -71,19 +68,17 @@ public class OnFileSortedOutput implements LogicalOutput {
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public synchronized List<Event> initialize(TezOutputContext outputContext)
-      throws IOException {
+  public synchronized List<Event> initialize() throws IOException {
     this.startTime = System.nanoTime();
-    this.outputContext = outputContext;
-    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     // Initializing this parametr in this conf since it is used in multiple
     // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
     // TezMerger, etc.
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-    outputContext.requestInitialMemory(
+    getContext().requestInitialMemory(
         ExternalSorter.getInitialMemoryRequirement(conf,
-            outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
+            getContext().getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
 
     sendEmptyPartitionDetails = this.conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
@@ -97,10 +92,10 @@ public class OnFileSortedOutput implements LogicalOutput {
       memoryUpdateCallbackHandler.validateUpdateReceived();
       if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
           TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
-        sorter = new PipelinedSorter(outputContext, conf, numOutputs,
+        sorter = new PipelinedSorter(getContext(), conf, getNumPhysicalOutputs(),
             memoryUpdateCallbackHandler.getMemoryAssigned());
       } else {
-        sorter = new DefaultSorter(outputContext, conf, numOutputs,
+        sorter = new DefaultSorter(getContext(), conf, getNumPhysicalOutputs(),
             memoryUpdateCallbackHandler.getMemoryAssigned());
       }
       isStarted.set(true);
@@ -124,11 +119,6 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
-  public synchronized void setNumPhysicalOutputs(int numOutputs) {
-    this.numOutputs = numOutputs;
-  }
-
-  @Override
   public synchronized List<Event> close() throws IOException {
     if (sorter != null) {
       sorter.flush();
@@ -136,7 +126,7 @@ public class OnFileSortedOutput implements LogicalOutput {
       this.endTime = System.nanoTime();
       return generateEventsOnClose();
     } else {
-      LOG.warn("Attempting to close output " + outputContext.getDestinationVertexName()
+      LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
           + " before it was started");
       return Collections.emptyList();
     }
@@ -145,7 +135,7 @@ public class OnFileSortedOutput implements LogicalOutput {
   protected List<Event> generateEventsOnClose() throws IOException {
     String host = System.getenv(ApplicationConstants.Environment.NM_HOST
         .toString());
-    ByteBuffer shuffleMetadata = outputContext
+    ByteBuffer shuffleMetadata = getContext()
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
 
@@ -169,29 +159,29 @@ public class OnFileSortedOutput implements LogicalOutput {
             TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
         payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
         LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
-                + numOutputs + ", emptyPartitions=" + emptyPartitions
+                + getNumPhysicalOutputs() + ", emptyPartitions=" + emptyPartitions
               + ", compressedSize=" + emptyPartitionsBytesString.size());
       }
     }
     payloadBuilder.setHost(host);
     payloadBuilder.setPort(shufflePort);
-    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+    payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
     byte[] payloadBytes = payloadProto.toByteArray();
 
-    long outputSize = outputContext.getCounters()
+    long outputSize = getContext().getCounters()
         .findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
         .newBuilder();
     vmBuilder.setOutputSize(outputSize);
     VertexManagerEvent vmEvent = new VertexManagerEvent(
-        outputContext.getDestinationVertexName(), vmBuilder.build().toByteArray());    
+        getContext().getDestinationVertexName(), vmBuilder.build().toByteArray());    
 
-    List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
+    List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1);
     events.add(vmEvent);
 
-    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, numOutputs, payloadBytes);
+    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, getNumPhysicalOutputs(), payloadBytes);
     events.add(csdme);
 
     return events;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 68cfcae..af7ca4e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -31,9 +31,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
@@ -46,11 +45,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
-public class OnFileUnorderedKVOutput implements LogicalOutput {
+public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileUnorderedKVOutput.class);
 
-  private TezOutputContext outputContext;
   private FileBasedKVWriter kvWriter;
   
   private Configuration conf;
@@ -62,15 +60,14 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public synchronized List<Event> initialize(TezOutputContext outputContext)
+  public synchronized List<Event> initialize()
       throws Exception {
-    this.outputContext = outputContext;
-    this.conf = TezUtils.createConfFromUserPayload(outputContext
+    this.conf = TezUtils.createConfFromUserPayload(getContext()
         .getUserPayload());
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
-        outputContext.getWorkDirs());
+        getContext().getWorkDirs());
 
-    this.outputContext.requestInitialMemory(0l, null); // mandatory call
+    getContext().requestInitialMemory(0l, null); // mandatory call
 
     this.dataViaEventsEnabled = conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED,
@@ -83,7 +80,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
         + "dataViaEventsEnabled: " + dataViaEventsEnabled
         + ", dataViaEventsMaxSize: " + dataViaEventsMaxSize);
     
-    this.kvWriter = new FileBasedKVWriter(outputContext, conf);
+    this.kvWriter = new FileBasedKVWriter(getContext(), conf);
     return Collections.emptyList();
   }
 
@@ -123,7 +120,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     }
 
     String host = getHost();
-    ByteBuffer shuffleMetadata = outputContext
+    ByteBuffer shuffleMetadata = getContext()
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils
         .deserializeShuffleProviderMetaData(shuffleMetadata);
@@ -139,7 +136,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     if (outputGenerated) {
       payloadBuilder.setHost(host);
       payloadBuilder.setPort(shufflePort);
-      payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+      payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
     }
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 90dadb6..ced91f1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -29,10 +29,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -52,13 +51,11 @@ import com.google.common.collect.Sets;
  * are to be injected, then it completes. All configuration items are post-fixed
  * by the name of the vertex that executes this input.
  */
-public class TestInput implements LogicalInput {
+public class TestInput extends AbstractLogicalInput {
   private static final Log LOG = LogFactory
       .getLog(TestInput.class);
   
   Configuration conf;
-  TezInputContext inputContext;
-  int numInputs;
   int numCompletedInputs = 0;
   int[] completedInputVersion;
   AtomicInteger inputReady = new AtomicInteger(-1);
@@ -137,48 +134,48 @@ public class TestInput implements LogicalInput {
       if (doFail) {
         if (
             (failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(inputContext.getTaskIndex())) &&
+            failingTaskIndices.contains(getContext().getTaskIndex())) &&
             (failingTaskAttempts.contains(failAll) || 
-             failingTaskAttempts.contains(inputContext.getTaskAttemptNumber())) &&
+             failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
              (lastInputReadyValue <= failingInputUpto)) {
           List<Event> events = Lists.newLinkedList();
           if (failingInputIndices.contains(failAll)) {
-            for (int i=0; i<numInputs; ++i) {
-              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
+            for (int i=0; i<numPhysicalInputs; ++i) {
+              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
                   " index: " + i + " version: " + lastInputReadyValue);
               events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           } else {
             for (Integer index : failingInputIndices) {
-              if (index.intValue() >= numInputs) {
+              if (index.intValue() >= numPhysicalInputs) {
                 throwException("InputIndex: " + index.intValue() + 
-                    " should be less than numInputs: " + numInputs);
+                    " should be less than numInputs: " + numPhysicalInputs);
               }
               if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
                 continue; // dont fail a previous version now.
               }
-              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
+              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
                   " index: " + index.intValue() + " version: " + lastInputReadyValue);
               events.add(new InputReadErrorEvent(msg, index.intValue(), lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           }
-          inputContext.sendEvents(events);
+          getContext().sendEvents(events);
           if (doFailAndExit) {
-            String msg = "FailingInput exiting: " + inputContext.getUniqueIdentifier();
+            String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
             LOG.info(msg);
             throwException(msg);
           } else {
             done = false;
           }
         } else if ((failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(inputContext.getTaskIndex()))){
+            failingTaskIndices.contains(getContext().getTaskIndex()))){
           boolean previousAttemptReadFailed = false;
           if (failingTaskAttempts.contains(failAll)) {
             previousAttemptReadFailed = true;
           } else {
-            for (int i=0 ; i<inputContext.getTaskAttemptNumber(); ++i) {
+            for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
               if (failingTaskAttempts.contains(new Integer(i))) {
                 previousAttemptReadFailed = true;
                 break;
@@ -199,7 +196,7 @@ public class TestInput implements LogicalInput {
     
     // sum input value given by upstream tasks
     int sum = 0;
-    for (int i=0; i<numInputs; ++i) {
+    for (int i=0; i<numPhysicalInputs; ++i) {
       if (inputValues[i] == -1) {
         throwException("Invalid input value : " + i);
       }
@@ -211,7 +208,7 @@ public class TestInput implements LogicalInput {
   
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
-    inputContext.fatalError(e , msg);
+    getContext().fatalError(e , msg);
     throw e;
   }
   
@@ -220,13 +217,12 @@ public class TestInput implements LogicalInput {
   }
   
   @Override
-  public List<Event> initialize(TezInputContext inputContext) throws Exception {
-    this.inputContext = inputContext;
-    this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
-    this.inputContext.inputIsReady();
-    if (inputContext.getUserPayload() != null) {
-      String vName = inputContext.getTaskVertexName();
-      conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+  public List<Event> initialize() throws Exception {
+    getContext().requestInitialMemory(0l, null); //Mandatory call.
+    getContext().inputIsReady();
+    if (getContext().getUserPayload() != null) {
+      String vName = getContext().getTaskVertexName();
+      conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
       doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);
       doFailAndExit = conf.getBoolean(
           getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
@@ -276,7 +272,7 @@ public class TestInput implements LogicalInput {
         LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + 
             " targetId: " + dmEvent.getTargetIndex() +
             " version: " + dmEvent.getVersion() +
-            " numInputs: " + numInputs +
+            " numInputs: " + numPhysicalInputs +
             " numCompletedInputs: " + numCompletedInputs);
         this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
         this.inputValues[dmEvent.getTargetIndex()] = 
@@ -287,13 +283,13 @@ public class TestInput implements LogicalInput {
         LOG.info("Received InputFailed event sourceId : " + ifEvent.getSourceIndex() + 
             " targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
-            " numInputs: " + numInputs +
+            " numInputs: " + numPhysicalInputs +
             " numCompletedInputs: " + numCompletedInputs);
       }
     }
-    if (numCompletedInputs == numInputs) {
+    if (numCompletedInputs == numPhysicalInputs) {
       int maxInputVersionSeen = -1;  
-      for (int i=0; i<numInputs; ++i) {
+      for (int i=0; i<numPhysicalInputs; ++i) {
         if (completedInputVersion[i] < 0) {
           LOG.info("Not received completion for input " + i);
           return;
@@ -317,7 +313,7 @@ public class TestInput implements LogicalInput {
 
   @Override
   public void setNumPhysicalInputs(int numInputs) {
-    this.numInputs = numInputs;
+    this.numPhysicalInputs = numInputs;
     this.completedInputVersion = new int[numInputs];
     this.inputValues = new int[numInputs];
     for (int i=0; i<numInputs; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7083f011/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 4814d2d..0ab1070 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -25,15 +25,14 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 
 import com.google.common.collect.Lists;
 
-public class TestOutput implements LogicalOutput {
+public class TestOutput extends AbstractLogicalOutput {
   private static final Log LOG = LogFactory.getLog(TestOutput.class);
   
   public static OutputDescriptor getOutputDesc(byte[] payload) {
@@ -42,14 +41,10 @@ public class TestOutput implements LogicalOutput {
   }
   
   int output;
-  int numOutputs;
-  TezOutputContext outputContext;
   
   @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws Exception {
-    this.outputContext = outputContext;
-    this.outputContext.requestInitialMemory(0l, null); //Mandatory call
+  public List<Event> initialize() throws Exception {
+    getContext().requestInitialMemory(0l, null); //Mandatory call
     return Collections.emptyList();
   }
   
@@ -74,17 +69,12 @@ public class TestOutput implements LogicalOutput {
   public List<Event> close() throws Exception {
     LOG.info("Sending data movement event with value: " + output);
     byte[] result = ByteBuffer.allocate(4).putInt(output).array();
-    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
-    for (int i = 0; i < numOutputs; i++) {
+    List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());
+    for (int i = 0; i < getNumPhysicalOutputs(); i++) {
       DataMovementEvent event = new DataMovementEvent(i, result);
       events.add(event);
     }
     return events;
   }
 
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numOutputs = numOutputs;
-  }
-
 }


Mime
View raw message