tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: Revert "TEZ-695. Create Abstract class for Input/Processor/Output (Mohammad Kamrul Islam via bikas)"
Date Tue, 15 Apr 2014 23:16:46 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 6e07f0388 -> 0c324736b


Revert "TEZ-695. Create Abstract class for Input/Processor/Output (Mohammad Kamrul Islam via bikas)"

This reverts commit 6e07f0388b4a99a7d921ea697fb0c6cad2cea674.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0c324736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0c324736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0c324736

Branch: refs/heads/master
Commit: 0c324736bbcfa3d945cf9063426f0b04c50fcbf4
Parents: 6e07f03
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 15 16:16:10 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 15 16:16:10 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     | 14 -----
 .../runtime/api/AbstractLogicalIOProcessor.java | 45 ---------------
 .../tez/runtime/api/AbstractLogicalInput.java   | 53 -----------------
 .../tez/runtime/api/AbstractLogicalOutput.java  | 53 -----------------
 .../tez/mapreduce/examples/UnionExample.java    |  3 +
 .../tez/mapreduce/examples/WordCount.java       | 61 +++++++++++---------
 .../org/apache/tez/mapreduce/input/MRInput.java | 50 +++++++++-------
 .../apache/tez/mapreduce/output/MROutput.java   | 39 ++++++++-----
 .../tez/runtime/TestInputReadyTracker.java      | 21 +++++--
 .../TestLogicalIOProcessorRuntimeTask.java      | 55 +++++++++++-------
 .../library/common/shuffle/impl/Fetcher.java    | 56 ++----------------
 .../runtime/library/input/LocalMergedInput.java | 16 ++---
 .../library/input/ShuffledMergedInput.java      | 49 +++++++++-------
 .../input/ShuffledMergedInputLegacy.java        |  2 +-
 .../library/input/ShuffledUnorderedKVInput.java | 50 +++++++++-------
 .../library/output/LocalOnFileSorterOutput.java |  2 +-
 .../library/output/OnFileSortedOutput.java      | 46 +++++++++------
 .../library/output/OnFileUnorderedKVOutput.java | 21 ++++---
 .../java/org/apache/tez/test/TestInput.java     | 56 +++++++++---------
 .../java/org/apache/tez/test/TestOutput.java    | 22 +++++--
 20 files changed, 298 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 153ee5a..e5e409b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -226,20 +226,6 @@ public class TezJobConfig {
   /**
    * 
    */
-  public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED =
-      "tez.runtime.shuffle.keep-alive.enabled";
-  public static final boolean DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED = false;
-
-  /**
-   *
-   */
-  public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS =
-      "tez.runtime.shuffle.keep-alive.max.connections";
-  public static final int DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS = 20;
-
-  /**
-   *
-   */
   public static final String TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = "tez.runtime.shuffle.read.timeout";
   public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = 
       3 * 60 * 1000;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
deleted file mode 100644
index 27e1bc8..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.api;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-/**
- * Abstract representation of the interface {@link LogicalIOProcessor}.
- * Implements the base logic of some methods into this class.
- * It will reduce the code for any processor implementation.
- *
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor {
-  protected TezProcessorContext context;
-
-  @Override
-  public void initialize(TezProcessorContext processorContext) throws Exception {
-    this.context = processorContext;
-    initialize();
-  }
-
-  public abstract void initialize() throws Exception;
-
-  public TezProcessorContext getContext() {
-    return context;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
deleted file mode 100644
index e079587..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.api;
-
-import java.util.List;
-
-/**
- * The abstract implementation of {@link LogicalInput}. It includes default
- * implementations of few methods for the convenience.
- * 
- */
-public abstract class AbstractLogicalInput implements LogicalInput {
-
-  protected int numPhysicalInputs;
-  protected TezInputContext inputContext;
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    this.numPhysicalInputs = numInputs;
-  }
-
-  @Override
-  public List<Event> initialize(TezInputContext _inputContext) throws Exception {
-    this.inputContext = _inputContext;
-    return initialize();
-  }
-
-  public abstract List<Event> initialize() throws Exception;
-
-  public int getNumPhysicalInputs() {
-    return numPhysicalInputs;
-  }
-
-  public TezInputContext getContext() {
-    return inputContext;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
deleted file mode 100644
index abdf2ae..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.api;
-
-import java.util.List;
-
-/**
- * The abstract implementation of {@link LogicalOutput}. It includes default
- * implementations of few methods for the convenience.
- * 
- */
-public abstract class AbstractLogicalOutput implements LogicalOutput {
-
-  protected int numPhysicalOutputs;
-  protected TezOutputContext outputContext;
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numPhysicalOutputs = numOutputs;
-  }
-
-  @Override
-  public List<Event> initialize(TezOutputContext _outputContext) throws Exception {
-    this.outputContext = _outputContext;
-    return initialize();
-  }
-  
-  public abstract List<Event> initialize() throws Exception;
-
-  public int getNumPhysicalOutputs() {
-    return numPhysicalOutputs;
-  }
-
-  public TezOutputContext getContext() {
-    return outputContext;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index d73bbe1..14a9648 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +49,7 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.DAG;
@@ -57,6 +59,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 7dade9b..20d9229 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,9 +18,11 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
 
@@ -47,6 +49,7 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -54,12 +57,14 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -67,10 +72,11 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -81,11 +87,26 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import com.google.common.base.Preconditions;
 
 public class WordCount {
-  public static class TokenProcessor extends AbstractLogicalIOProcessor {
+  public static class TokenProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
     @Override
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
       for (LogicalInput input : inputs.values()) {
@@ -108,25 +129,26 @@ public class WordCount {
         }
       }
     }
-
+    
+  }
+  
+  public static class SumProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
+    
     @Override
-    public void initialize() throws Exception {
-
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
     }
 
     @Override
     public void handleEvents(List<Event> processorEvents) {
-      
     }
 
     @Override
     public void close() throws Exception {
-      
     }
 
-  }
-  
-  public static class SumProcessor extends AbstractLogicalIOProcessor {
     @Override
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
@@ -151,28 +173,13 @@ public class WordCount {
         kvWriter.write(word, new IntWritable(sum));
       }
       if (out.isCommitRequired()) {
-        while (!getContext().canCommit()) {
+        while (!context.canCommit()) {
           Thread.sleep(100);
         }
         out.commit();
       }
     }
-
-    @Override
-    public void initialize() throws Exception {
-
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-      
-    }
-
-    @Override
-    public void close() throws Exception {
-      
-    }
-
+    
   }
   
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 56d55ed..1ac0295 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -56,9 +56,10 @@ import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -74,12 +75,13 @@ import com.google.common.base.Preconditions;
  * This class is not meant to be extended by external projects.
  */
 
-public class MRInput extends AbstractLogicalInput {
+public class MRInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(MRInput.class);
   
   private final Lock rrLock = new ReentrantLock();
   private Condition rrInited = rrLock.newCondition();
+  private TezInputContext inputContext;
   
   private volatile boolean eventReceived = false;
   
@@ -113,11 +115,12 @@ public class MRInput extends AbstractLogicalInput {
   
   
   @Override
-  public List<Event> initialize() throws IOException {
-    getContext().requestInitialMemory(0l, null); //mandatory call
-    getContext().inputIsReady();
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.inputContext.requestInitialMemory(0l, null); //mandatory call
+    this.inputContext.inputIsReady();
     MRInputUserPayloadProto mrUserPayload =
-      MRHelpers.parseMRInputPayload(getContext().getUserPayload());
+      MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
         "Split information not expected in MRInput");
     Configuration conf =
@@ -128,21 +131,21 @@ public class MRInput extends AbstractLogicalInput {
 
     TaskAttemptID taskAttemptId = new TaskAttemptID(
       new TaskID(
-        Long.toString(getContext().getApplicationId().getClusterTimestamp()),
-        getContext().getApplicationId().getId(), TaskType.MAP,
-        getContext().getTaskIndex()),
-        getContext().getTaskAttemptNumber());
+        Long.toString(inputContext.getApplicationId().getClusterTimestamp()),
+        inputContext.getApplicationId().getId(), TaskType.MAP,
+        inputContext.getTaskIndex()),
+      inputContext.getTaskAttemptNumber());
 
     jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
       taskAttemptId.toString());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        getContext().getDAGAttemptNumber());
+      inputContext.getDAGAttemptNumber());
 
     // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
     // processor. (The processor could provide the counter though)
 
-    this.inputRecordCounter = getContext().getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
 
     useNewApi = this.jobConf.getUseNewMapper();
     this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
@@ -153,7 +156,7 @@ public class MRInput extends AbstractLogicalInput {
     initializeInternal();
     return null;
   }
-
+  
   @Override
   public void start() {
   }
@@ -172,7 +175,7 @@ public class MRInput extends AbstractLogicalInput {
       } else {
         // Read split information.
         TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
-        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext()
+        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
             .getTaskIndex()];
         this.splitMetaInfo = new TaskSplitIndex(
             thisTaskMetaInfo.getSplitLocation(),
@@ -190,7 +193,7 @@ public class MRInput extends AbstractLogicalInput {
     } finally {
       rrLock.unlock();
     }
-    LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName());
+    LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
   }
 
   private void setupOldInputFormat() {
@@ -200,7 +203,7 @@ public class MRInput extends AbstractLogicalInput {
   private void setupOldRecordReader() throws IOException {
     Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
     oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-        this.jobConf, new MRReporter(getContext(), oldInputSplit));
+        this.jobConf, new MRReporter(inputContext, oldInputSplit));
     setIncrementalConfigParams(oldInputSplit);
   }
   
@@ -241,7 +244,7 @@ public class MRInput extends AbstractLogicalInput {
     }
 
     LOG.info("Creating reader for MRInput: "
-        + getContext().getSourceVertexName());
+        + inputContext.getSourceVertexName());
     return new MRInputKVReader();
   }
 
@@ -262,6 +265,13 @@ public class MRInput extends AbstractLogicalInput {
     processSplitEvent((RootInputDataInformationEvent)event);
   }
 
+  
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    // Not required at the moment. May be required if splits are sent via events.
+  }
+
   @Override
   public List<Event> close() throws IOException {
     if (useNewApi) {
@@ -298,7 +308,7 @@ public class MRInput extends AbstractLogicalInput {
 
   
   private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, getContext(), true, null);
+    return new TaskAttemptContextImpl(this.jobConf, inputContext, true, null);
   }
   
   void processSplitEvent(RootInputDataInformationEvent event)
@@ -391,7 +401,7 @@ public class MRInput extends AbstractLogicalInput {
     deserializer.open(inFile);
     org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
     long pos = inFile.getPos();
-    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
         .increment(pos - offset);
     inFile.close();
     return split;
@@ -437,7 +447,7 @@ public class MRInput extends AbstractLogicalInput {
     org.apache.hadoop.mapreduce.InputSplit split = 
         deserializer.deserialize(null);
     long pos = inFile.getPos();
-    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
         .increment(pos - offset);
     inFile.close();
     return split;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 8aeee26..2ecf602 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -46,17 +46,19 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
-public class MROutput extends AbstractLogicalOutput {
+public class MROutput implements LogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(MROutput.class);
 
   private final NumberFormat taskNumberFormat = NumberFormat.getInstance();
   private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance();
   
+  private TezOutputContext outputContext;
   private JobConf jobConf;
   boolean useNewApi;
   private AtomicBoolean closed = new AtomicBoolean(false);
@@ -81,15 +83,17 @@ public class MROutput extends AbstractLogicalOutput {
   protected OutputCommitter committer;
 
   @Override
-  public List<Event> initialize() throws IOException, InterruptedException {
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");
-    getContext().requestInitialMemory(0l, null); //mandatory call
+    outputContext.requestInitialMemory(0l, null); //mandatory call
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
     nonTaskNumberFormat.setMinimumIntegerDigits(3);
     nonTaskNumberFormat.setGroupingUsed(false);
+    this.outputContext = outputContext;
     Configuration conf = TezUtils.createConfFromUserPayload(
-        getContext().getUserPayload());
+        outputContext.getUserPayload());
     this.jobConf = new JobConf(conf);
     // Add tokens to the jobConf - in case they are accessed within the RW / OF
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
@@ -97,9 +101,9 @@ public class MROutput extends AbstractLogicalOutput {
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        getContext().getDAGAttemptNumber());
+        outputContext.getDAGAttemptNumber());
     TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
-        .createMockTaskAttemptID(getContext(), isMapperOutput);
+        .createMockTaskAttemptID(outputContext, isMapperOutput);
     jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
     jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
     jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
@@ -114,7 +118,7 @@ public class MROutput extends AbstractLogicalOutput {
       }
     }
 
-    outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
+    outputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
 
     if (useNewApi) {
       newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
@@ -136,7 +140,7 @@ public class MROutput extends AbstractLogicalOutput {
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
               jobConf, taskAttemptId,
-              new MRTaskReporter(getContext()));
+              new MRTaskReporter(outputContext));
       oldOutputFormat = jobConf.getOutputFormat();
 
       FileSystem fs = FileSystem.get(jobConf);
@@ -144,7 +148,7 @@ public class MROutput extends AbstractLogicalOutput {
 
       oldRecordWriter =
           oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(getContext()));
+              fs, jobConf, finalName, new MRReporter(outputContext));
     }
     initCommitter(jobConf, useNewApi);
 
@@ -152,7 +156,7 @@ public class MROutput extends AbstractLogicalOutput {
         + ", using_new_api: " + useNewApi);
     return null;
   }
-
+  
   @Override
   public void start() {
   }
@@ -204,7 +208,7 @@ public class MROutput extends AbstractLogicalOutput {
   }
 
   private TaskAttemptContext createTaskAttemptContext(TaskAttemptID attemptId) {
-    return new TaskAttemptContextImpl(this.jobConf, attemptId, getContext(),
+    return new TaskAttemptContextImpl(this.jobConf, attemptId, outputContext,
         isMapperOutput, null);
   }
 
@@ -212,8 +216,8 @@ public class MROutput extends AbstractLogicalOutput {
     String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
     if (prefix == null) {
       prefix = "part-v" + 
-          nonTaskNumberFormat.format(getContext().getTaskVertexIndex()) +  
-          "-o" + nonTaskNumberFormat.format(getContext().getOutputIndex());
+          nonTaskNumberFormat.format(outputContext.getTaskVertexIndex()) +  
+          "-o" + nonTaskNumberFormat.format(outputContext.getOutputIndex());
     }
     return prefix;
   }
@@ -221,7 +225,7 @@ public class MROutput extends AbstractLogicalOutput {
   private String getOutputName() {
     // give a unique prefix to the output name
     return getOutputFileNamePrefix() + 
-        "-" + taskNumberFormat.format(getContext().getTaskIndex());
+        "-" + taskNumberFormat.format(outputContext.getTaskIndex());
   }
 
   @Override
@@ -272,6 +276,11 @@ public class MROutput extends AbstractLogicalOutput {
     return null;
   }
 
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    // Nothing to do for now
+  }
+
   /**
    * MROutput expects that a Processor call commit prior to the
    * Processor's completion

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index cb9f76a..6f5aa2f 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -26,11 +26,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -204,7 +205,7 @@ public class TestInputReadyTracker {
     return startTime;
   }
 
-  private static class ImmediatelyReadyInputForTest extends AbstractLogicalInput {
+  private static class ImmediatelyReadyInputForTest implements LogicalInput {
 
     private volatile boolean isReady = false;
     
@@ -214,7 +215,7 @@ public class TestInputReadyTracker {
     }
 
     @Override
-    public List<Event> initialize() throws Exception {
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
       return null;
     }
 
@@ -235,9 +236,13 @@ public class TestInputReadyTracker {
     public List<Event> close() throws Exception {
       return null;
     }
+
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
   }
   
-  private static class ControlledReadyInputForTest extends AbstractLogicalInput {
+  private static class ControlledReadyInputForTest implements LogicalInput {
 
     private volatile boolean isReady = false;
     private InputReadyTracker inputReadyTracker;
@@ -247,7 +252,7 @@ public class TestInputReadyTracker {
     }
 
     @Override
-    public List<Event> initialize() throws Exception {
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
       return null;
     }
 
@@ -269,7 +274,11 @@ public class TestInputReadyTracker {
       return null;
     }
 
-   // Used by the test to control when this input will be ready
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
+    
+    // Used by the test to control when this input will be ready
     public void setInputIsReady() {
       isReady = true;
       inputReadyTracker.setInputIsReady(this);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 5eb25c8..27a250e 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -34,13 +34,14 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
-import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -135,7 +136,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     return TezDAGID.getInstance("2000", 100, 1);
   }
 
-  public static class TestProcessor extends AbstractLogicalIOProcessor {
+  public static class TestProcessor implements LogicalIOProcessor {
 
     public static volatile int runCount = 0;
 
@@ -143,7 +144,19 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public void initialize() throws Exception {
+    public void initialize(TezProcessorContext processorContext) throws Exception {
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() throws Exception {
+      // TODO Auto-generated method stub
+
     }
 
     @Override
@@ -152,19 +165,9 @@ public class TestLogicalIOProcessorRuntimeTask {
       runCount++;
     }
 
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-		
-	}
-
-	@Override
-	public void close() throws Exception {
-		
-	}
-
   }
 
-  public static class TestInput extends AbstractLogicalInput {
+  public static class TestInput implements LogicalInput {
 
     public static volatile int startCount = 0;
 
@@ -172,9 +175,9 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public List<Event> initialize() throws Exception {
-      getContext().requestInitialMemory(0, null);
-      getContext().inputIsReady();
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+      inputContext.requestInitialMemory(0, null);
+      inputContext.inputIsReady();
       return null;
     }
 
@@ -198,9 +201,13 @@ public class TestLogicalIOProcessorRuntimeTask {
       return null;
     }
 
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
+
   }
 
-  public static class TestOutput extends AbstractLogicalOutput {
+  public static class TestOutput implements LogicalOutput {
 
     public static volatile int startCount = 0;
 
@@ -208,8 +215,8 @@ public class TestLogicalIOProcessorRuntimeTask {
     }
 
     @Override
-    public List<Event> initialize() throws Exception {
-      getContext().requestInitialMemory(0, null);
+    public List<Event> initialize(TezOutputContext outputContext) throws Exception {
+      outputContext.requestInitialMemory(0, null);
       return null;
     }
 
@@ -233,5 +240,9 @@ public class TestLogicalIOProcessorRuntimeTask {
       return null;
     }
 
+    @Override
+    public void setNumPhysicalOutputs(int numOutputs) {
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index b8ab79f..9462d95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -38,7 +38,6 @@ import javax.net.ssl.HttpsURLConnection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -65,9 +64,6 @@ class Fetcher extends Thread {
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
-  private final static String URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED =
-      "sun.net.http.errorstream.enableBuffering";
-  private final static String URL_CONNECTION_MAX_CONNECTIONS = "http.maxConnections";
   private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
   private final TezCounter connectionErrs;
   private final TezCounter ioErrs;
@@ -102,8 +98,6 @@ class Fetcher extends Thread {
   
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
-  private boolean keepAlive = false;
-
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
@@ -138,19 +132,6 @@ class Fetcher extends Thread {
       this.decompressor = null;
     }
 
-    this.keepAlive =
-        job.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
-    int keepAliveMaxConnections =
-        job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
-
-    if (keepAlive) {
-      System.setProperty(URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED, "true");
-      System.setProperty(URL_CONNECTION_MAX_CONNECTIONS, String.valueOf(keepAliveMaxConnections));
-      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
-    }
-
     this.connectionTimeout = 
         job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
@@ -264,12 +245,11 @@ class Fetcher extends Thread {
     
     // Construct the url and connect
     DataInputStream input;
-    HttpURLConnection connection = null;
     boolean connectSucceeded = false;
     
     try {
       URL url = getMapOutputURL(host, srcAttempts);
-      connection = openConnection(url);
+      HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -313,11 +293,6 @@ class Fetcher extends Thread {
       SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
-      if (keepAlive && connection != null) {
-        //Read the response body in case of error. This helps in connection reuse.
-        //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
-        readErrorStream(connection.getErrorStream());
-      }
       ioErrs.increment(1);
       if (!connectSucceeded) {
         LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
@@ -329,6 +304,7 @@ class Fetcher extends Thread {
 
       // At this point, either the connection failed, or the initial header verification failed.
       // The error does not relate to any specific Input. Report all of them as failed.
+      
       // This ends up indirectly penalizing the host (multiple failures reported on the single host)
 
       for(InputAttemptIdentifier left: remaining) {
@@ -362,10 +338,6 @@ class Fetcher extends Thread {
         for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }
-        //Being defensive: cleanup the error stream in case of keepAlive
-        if (keepAlive && connection != null) {
-          readErrorStream(connection.getErrorStream());
-        }
       }
       
       IOUtils.cleanup(LOG, input);
@@ -379,23 +351,7 @@ class Fetcher extends Thread {
       putBackRemainingMapOutputs(host);
     }
   }
-
-  /**
-   * Cleanup the error stream if any, for keepAlive connections
-   * 
-   * @param errorStream
-   */
-  private void readErrorStream(InputStream errorStream) {
-    try {
-      DataOutputBuffer errorBuffer = new DataOutputBuffer();
-      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
-      IOUtils.closeStream(errorBuffer);
-      IOUtils.closeStream(errorStream);
-    } catch(IOException ioe) {
-      //ignore
-    }
-  }
-
+  
   private void putBackRemainingMapOutputs(MapHost host) {
     // Cycle through remaining MapOutputs
     boolean isFirst = true;
@@ -594,11 +550,7 @@ class Fetcher extends Thread {
       url.append(mapId.getPathComponent());
       first = false;
     }
-    //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
-    //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
-    if (keepAlive) {
-      url.append("&keepAlive=true");
-    }
+   
     if (LOG.isDebugEnabled()) {
       LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 93ca93e..d298e8f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
 
 /**
@@ -33,16 +34,17 @@ import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
 public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
-  public List<Event> initialize() throws IOException {
-    getContext().requestInitialMemory(0l, null); // mandatory call.
-    getContext().inputIsReady();
-    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.inputContext.requestInitialMemory(0l, null); // mandatory call.
+    this.inputContext.inputIsReady();
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
-    if (getNumPhysicalInputs() == 0) {
+    if (numInputs == 0) {
       return Collections.emptyList();
     }
 
-    LocalShuffle localShuffle = new LocalShuffle(getContext(), conf, getNumPhysicalInputs());
+    LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
     rawIter = localShuffle.run();
     createValuesIterator();
     return Collections.emptyList();
@@ -54,7 +56,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
   public List<Event> close() throws IOException {
-    if (getNumPhysicalInputs() != 0) {
+    if (numInputs != 0) {
       rawIter.close();
     }
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 0c01f92..8a6de52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -33,8 +33,9 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -46,7 +47,7 @@ import com.google.common.base.Preconditions;
 
 
 /**
- * <code>ShuffleMergedInput</code> in a {@link AbstractLogicalInput} which shuffles
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
  * intermediate sorted data, merges them and provides key/<values> to the
  * consumer.
  *
@@ -56,12 +57,14 @@ import com.google.common.base.Preconditions;
  * completion. Attempting to get a reader on a non-complete input will block.
  *
  */
-public class ShuffledMergedInput extends AbstractLogicalInput {
+public class ShuffledMergedInput implements LogicalInput {
 
   static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
 
+  protected TezInputContext inputContext;
   protected TezRawKeyValueIterator rawIter = null;
   protected Configuration conf;
+  protected int numInputs = 0;
   protected Shuffle shuffle;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
@@ -75,27 +78,28 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public synchronized List<Event> initialize() throws IOException {
-    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+  public synchronized List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
-    if (this.getNumPhysicalInputs() == 0) {
-      getContext().requestInitialMemory(0l, null);
+    if (this.numInputs == 0) {
+      inputContext.requestInitialMemory(0l, null);
       isStarted.set(true);
-      getContext().inputIsReady();
+      inputContext.inputIsReady();
       LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
-          + getContext().getSourceVertexName());
+          + inputContext.getSourceVertexName());
       return Collections.emptyList();
     }
     
     long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
-        getContext().getTotalMemoryAvailableToTask());
+        inputContext.getTotalMemoryAvailableToTask());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-    getContext().requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
+    inputContext.requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
 
-    this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-    this.inputValueCounter = getContext().getCounters().findCounter(
+    this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = inputContext.getCounters().findCounter(
         TaskCounter.REDUCE_INPUT_RECORDS);
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
 
     return Collections.emptyList();
   }
@@ -105,7 +109,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();
       // Start the shuffle - copy and merge
-      shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+      shuffle = new Shuffle(inputContext, conf, numInputs, memoryUpdateCallbackHandler.getMemoryAssigned());
       shuffle.run();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
@@ -130,7 +134,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
    */
   public synchronized boolean isInputReady() {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-    if (getNumPhysicalInputs() == 0) {
+    if (this.numInputs == 0) {
       return true;
     }
     return shuffle.isInputReady();
@@ -146,7 +150,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     Shuffle localShuffleCopy = null;
     synchronized (this) {
       Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-      if (getNumPhysicalInputs() == 0) {
+      if (this.numInputs == 0) {
         return;
       }
       localShuffleCopy = shuffle;
@@ -161,7 +165,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
 
   @Override
   public synchronized List<Event> close() throws IOException {
-    if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+    if (this.numInputs != 0 && rawIter != null) {
       rawIter.close();
     }
     return Collections.emptyList();
@@ -185,7 +189,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     TezRawKeyValueIterator rawIterLocal;
     synchronized (this) {
       rawIterLocal = rawIter;
-      if (getNumPhysicalInputs() == 0) {
+      if (this.numInputs == 0) {
         return new KeyValuesReader() {
           @Override
           public boolean next() throws IOException {
@@ -223,7 +227,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) {
     synchronized (this) {
-      if (getNumPhysicalInputs() == 0) {
+      if (numInputs == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
       }
       if (!isStarted.get()) {
@@ -237,6 +241,11 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     shuffle.handleEvents(inputEvents);
   }
 
+  @Override
+  public synchronized void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
+  }
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   protected synchronized void createValuesIterator()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 2633968..9d89eec 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -35,7 +35,7 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
   public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
     // wait for input so that iterator is available
     synchronized(this) {
-    if (getNumPhysicalInputs() == 0) {
+    if (this.numInputs == 0) {
       return new TezRawKeyValueIterator() {
         @Override
         public DataInputBuffer getKey() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 16e5d68..61c870f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -1,5 +1,5 @@
 /**
-git diff * Licensed to the Apache Software Foundation (ASF) under one
+* Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
@@ -36,8 +36,9 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -48,12 +49,13 @@ import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
 
 import com.google.common.base.Preconditions;
-
-public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
+public class ShuffledUnorderedKVInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
   
   private Configuration conf;
+  private int numInputs = -1;
+  private TezInputContext inputContext;
   private ShuffleManager shuffleManager;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
   private long firstEventReceivedTime = -1;
@@ -71,25 +73,26 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
   }
 
   @Override
-  public synchronized List<Event> initialize() throws Exception {
-    Preconditions.checkArgument(getNumPhysicalInputs() != -1, "Number of Inputs has not been set");
-    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+  public synchronized List<Event> initialize(TezInputContext inputContext) throws Exception {
+    Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
-    if (getNumPhysicalInputs() == 0) {
-      getContext().requestInitialMemory(0l, null);
+    if (numInputs == 0) {
+      inputContext.requestInitialMemory(0l, null);
       isStarted.set(true);
-      getContext().inputIsReady();
+      inputContext.inputIsReady();
       LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
-          + getContext().getSourceVertexName());
+          + inputContext.getSourceVertexName());
       return Collections.emptyList();
     } else {
       long initalMemReq = getInitialMemoryReq();
       memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-      this.getContext().requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
+      this.inputContext.requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
     }
 
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
-    this.inputRecordCounter = getContext().getCounters().findCounter(
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+    this.inputRecordCounter = inputContext.getCounters().findCounter(
         TaskCounter.INPUT_RECORDS_PROCESSED);
     return Collections.emptyList();
   }
@@ -120,14 +123,14 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
       ifileBufferSize = conf.getInt("io.file.buffer.size",
           TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
-      this.inputManager = new SimpleFetchedInputAllocator(getContext().getUniqueIdentifier(), conf,
-          getContext().getTotalMemoryAvailableToTask(),
+      this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
+          inputContext.getTotalMemoryAvailableToTask(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
 
-      this.shuffleManager = new ShuffleManager(getContext(), conf, getNumPhysicalInputs(), ifileBufferSize,
+      this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs, ifileBufferSize,
           ifileReadAhead, ifileReadAheadLength, codec, inputManager);
 
-      this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager,
+      this.inputEventHandler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager,
           inputManager, codec, ifileReadAhead, ifileReadAheadLength);
 
       ////// End of Initial configuration
@@ -149,7 +152,7 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
   @Override
   public synchronized KeyValueReader getReader() throws Exception {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-    if (getNumPhysicalInputs() == 0) {
+    if (numInputs == 0) {
       return new KeyValueReader() {
         @Override
         public boolean next() throws IOException {
@@ -173,7 +176,7 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
     synchronized (this) {
-      if (getNumPhysicalInputs() == 0) {
+      if (numInputs == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
       }
       if (!isStarted.get()) {
@@ -198,9 +201,14 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
     return null;
   }
 
+  @Override
+  public synchronized void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
+  }
+
   private long getInitialMemoryReq() {
     return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
-        getContext().getTotalMemoryAvailableToTask());
+        inputContext.getTotalMemoryAvailableToTask());
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index d1c5fc0..d7e017a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -45,7 +45,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
     Path src = mapOutputFile.getOutputFile();
     Path dst =
         mapOutputFile.getInputFileForWrite(
-            getContext().getTaskIndex(),
+            outputContext.getTaskIndex(),
             localFs.getFileStatus(src).getLen());
 
     LOG.info("Renaming src = " + src + ", dst = " + dst);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 9cf03ff..c8f2b22 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -32,8 +32,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -52,15 +53,17 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 /**
- * <code>OnFileSortedOutput</code> is an {@link AbstractLogicalOutput} which sorts key/value pairs 
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
  * written to it and persists it to a file.
  */
-public class OnFileSortedOutput extends AbstractLogicalOutput {
+public class OnFileSortedOutput implements LogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileSortedOutput.class);
 
   protected ExternalSorter sorter;
   protected Configuration conf;
+  protected int numOutputs;
+  protected TezOutputContext outputContext;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private long startTime;
   private long endTime;
@@ -68,17 +71,19 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public synchronized List<Event> initialize() throws IOException {
+  public synchronized List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
     this.startTime = System.nanoTime();
-    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    this.outputContext = outputContext;
+    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
     // Initializing this parametr in this conf since it is used in multiple
     // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
     // TezMerger, etc.
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, getContext().getWorkDirs());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
-    getContext().requestInitialMemory(
+    outputContext.requestInitialMemory(
         ExternalSorter.getInitialMemoryRequirement(conf,
-            getContext().getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
+            outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
 
     sendEmptyPartitionDetails = this.conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
@@ -92,10 +97,10 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
       memoryUpdateCallbackHandler.validateUpdateReceived();
       if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
           TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
-        sorter = new PipelinedSorter(getContext(), conf, getNumPhysicalOutputs(),
+        sorter = new PipelinedSorter(outputContext, conf, numOutputs,
             memoryUpdateCallbackHandler.getMemoryAssigned());
       } else {
-        sorter = new DefaultSorter(getContext(), conf, getNumPhysicalOutputs(),
+        sorter = new DefaultSorter(outputContext, conf, numOutputs,
             memoryUpdateCallbackHandler.getMemoryAssigned());
       }
       isStarted.set(true);
@@ -119,6 +124,11 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
   }
 
   @Override
+  public synchronized void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
+  }
+
+  @Override
   public synchronized List<Event> close() throws IOException {
     if (sorter != null) {
       sorter.flush();
@@ -126,7 +136,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
       this.endTime = System.nanoTime();
       return generateEventsOnClose();
     } else {
-      LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
+      LOG.warn("Attempting to close output " + outputContext.getDestinationVertexName()
           + " before it was started");
       return Collections.emptyList();
     }
@@ -135,7 +145,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
   protected List<Event> generateEventsOnClose() throws IOException {
     String host = System.getenv(ApplicationConstants.Environment.NM_HOST
         .toString());
-    ByteBuffer shuffleMetadata = getContext()
+    ByteBuffer shuffleMetadata = outputContext
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
 
@@ -159,29 +169,29 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
             TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
         payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
         LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
-                + getNumPhysicalOutputs() + ", emptyPartitions=" + emptyPartitions
+                + numOutputs + ", emptyPartitions=" + emptyPartitions
               + ", compressedSize=" + emptyPartitionsBytesString.size());
       }
     }
     payloadBuilder.setHost(host);
     payloadBuilder.setPort(shufflePort);
-    payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
+    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
     byte[] payloadBytes = payloadProto.toByteArray();
 
-    long outputSize = getContext().getCounters()
+    long outputSize = outputContext.getCounters()
         .findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
         .newBuilder();
     vmBuilder.setOutputSize(outputSize);
     VertexManagerEvent vmEvent = new VertexManagerEvent(
-        getContext().getDestinationVertexName(), vmBuilder.build().toByteArray());    
+        outputContext.getDestinationVertexName(), vmBuilder.build().toByteArray());    
 
-    List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1);
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
     events.add(vmEvent);
 
-    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, getNumPhysicalOutputs(), payloadBytes);
+    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, numOutputs, payloadBytes);
     events.add(csdme);
 
     return events;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index af7ca4e..68cfcae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -31,8 +31,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
@@ -45,10 +46,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
-public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
+public class OnFileUnorderedKVOutput implements LogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileUnorderedKVOutput.class);
 
+  private TezOutputContext outputContext;
   private FileBasedKVWriter kvWriter;
   
   private Configuration conf;
@@ -60,14 +62,15 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
   }
 
   @Override
-  public synchronized List<Event> initialize()
+  public synchronized List<Event> initialize(TezOutputContext outputContext)
       throws Exception {
-    this.conf = TezUtils.createConfFromUserPayload(getContext()
+    this.outputContext = outputContext;
+    this.conf = TezUtils.createConfFromUserPayload(outputContext
         .getUserPayload());
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
-        getContext().getWorkDirs());
+        outputContext.getWorkDirs());
 
-    getContext().requestInitialMemory(0l, null); // mandatory call
+    this.outputContext.requestInitialMemory(0l, null); // mandatory call
 
     this.dataViaEventsEnabled = conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED,
@@ -80,7 +83,7 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
         + "dataViaEventsEnabled: " + dataViaEventsEnabled
         + ", dataViaEventsMaxSize: " + dataViaEventsMaxSize);
     
-    this.kvWriter = new FileBasedKVWriter(getContext(), conf);
+    this.kvWriter = new FileBasedKVWriter(outputContext, conf);
     return Collections.emptyList();
   }
 
@@ -120,7 +123,7 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
     }
 
     String host = getHost();
-    ByteBuffer shuffleMetadata = getContext()
+    ByteBuffer shuffleMetadata = outputContext
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils
         .deserializeShuffleProviderMetaData(shuffleMetadata);
@@ -136,7 +139,7 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
     if (outputGenerated) {
       payloadBuilder.setHost(host);
       payloadBuilder.setPort(shufflePort);
-      payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
+      payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
     }
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index ced91f1..90dadb6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -29,9 +29,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -51,11 +52,13 @@ import com.google.common.collect.Sets;
  * are to be injected, then it completes. All configuration items are post-fixed
  * by the name of the vertex that executes this input.
  */
-public class TestInput extends AbstractLogicalInput {
+public class TestInput implements LogicalInput {
   private static final Log LOG = LogFactory
       .getLog(TestInput.class);
   
   Configuration conf;
+  TezInputContext inputContext;
+  int numInputs;
   int numCompletedInputs = 0;
   int[] completedInputVersion;
   AtomicInteger inputReady = new AtomicInteger(-1);
@@ -134,48 +137,48 @@ public class TestInput extends AbstractLogicalInput {
       if (doFail) {
         if (
             (failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(getContext().getTaskIndex())) &&
+            failingTaskIndices.contains(inputContext.getTaskIndex())) &&
             (failingTaskAttempts.contains(failAll) || 
-             failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
+             failingTaskAttempts.contains(inputContext.getTaskAttemptNumber())) &&
              (lastInputReadyValue <= failingInputUpto)) {
           List<Event> events = Lists.newLinkedList();
           if (failingInputIndices.contains(failAll)) {
-            for (int i=0; i<numPhysicalInputs; ++i) {
-              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
+            for (int i=0; i<numInputs; ++i) {
+              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
                   " index: " + i + " version: " + lastInputReadyValue);
               events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           } else {
             for (Integer index : failingInputIndices) {
-              if (index.intValue() >= numPhysicalInputs) {
+              if (index.intValue() >= numInputs) {
                 throwException("InputIndex: " + index.intValue() + 
-                    " should be less than numInputs: " + numPhysicalInputs);
+                    " should be less than numInputs: " + numInputs);
               }
               if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
                 continue; // dont fail a previous version now.
               }
-              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
+              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
                   " index: " + index.intValue() + " version: " + lastInputReadyValue);
               events.add(new InputReadErrorEvent(msg, index.intValue(), lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           }
-          getContext().sendEvents(events);
+          inputContext.sendEvents(events);
           if (doFailAndExit) {
-            String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
+            String msg = "FailingInput exiting: " + inputContext.getUniqueIdentifier();
             LOG.info(msg);
             throwException(msg);
           } else {
             done = false;
           }
         } else if ((failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(getContext().getTaskIndex()))){
+            failingTaskIndices.contains(inputContext.getTaskIndex()))){
           boolean previousAttemptReadFailed = false;
           if (failingTaskAttempts.contains(failAll)) {
             previousAttemptReadFailed = true;
           } else {
-            for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
+            for (int i=0 ; i<inputContext.getTaskAttemptNumber(); ++i) {
               if (failingTaskAttempts.contains(new Integer(i))) {
                 previousAttemptReadFailed = true;
                 break;
@@ -196,7 +199,7 @@ public class TestInput extends AbstractLogicalInput {
     
     // sum input value given by upstream tasks
     int sum = 0;
-    for (int i=0; i<numPhysicalInputs; ++i) {
+    for (int i=0; i<numInputs; ++i) {
       if (inputValues[i] == -1) {
         throwException("Invalid input value : " + i);
       }
@@ -208,7 +211,7 @@ public class TestInput extends AbstractLogicalInput {
   
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
-    getContext().fatalError(e , msg);
+    inputContext.fatalError(e , msg);
     throw e;
   }
   
@@ -217,12 +220,13 @@ public class TestInput extends AbstractLogicalInput {
   }
   
   @Override
-  public List<Event> initialize() throws Exception {
-    getContext().requestInitialMemory(0l, null); //Mandatory call.
-    getContext().inputIsReady();
-    if (getContext().getUserPayload() != null) {
-      String vName = getContext().getTaskVertexName();
-      conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+  public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    this.inputContext = inputContext;
+    this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
+    this.inputContext.inputIsReady();
+    if (inputContext.getUserPayload() != null) {
+      String vName = inputContext.getTaskVertexName();
+      conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
       doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);
       doFailAndExit = conf.getBoolean(
           getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
@@ -272,7 +276,7 @@ public class TestInput extends AbstractLogicalInput {
         LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + 
             " targetId: " + dmEvent.getTargetIndex() +
             " version: " + dmEvent.getVersion() +
-            " numInputs: " + numPhysicalInputs +
+            " numInputs: " + numInputs +
             " numCompletedInputs: " + numCompletedInputs);
         this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
         this.inputValues[dmEvent.getTargetIndex()] = 
@@ -283,13 +287,13 @@ public class TestInput extends AbstractLogicalInput {
         LOG.info("Received InputFailed event sourceId : " + ifEvent.getSourceIndex() + 
             " targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
-            " numInputs: " + numPhysicalInputs +
+            " numInputs: " + numInputs +
             " numCompletedInputs: " + numCompletedInputs);
       }
     }
-    if (numCompletedInputs == numPhysicalInputs) {
+    if (numCompletedInputs == numInputs) {
       int maxInputVersionSeen = -1;  
-      for (int i=0; i<numPhysicalInputs; ++i) {
+      for (int i=0; i<numInputs; ++i) {
         if (completedInputVersion[i] < 0) {
           LOG.info("Not received completion for input " + i);
           return;
@@ -313,7 +317,7 @@ public class TestInput extends AbstractLogicalInput {
 
   @Override
   public void setNumPhysicalInputs(int numInputs) {
-    this.numPhysicalInputs = numInputs;
+    this.numInputs = numInputs;
     this.completedInputVersion = new int[numInputs];
     this.inputValues = new int[numInputs];
     for (int i=0; i<numInputs; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0c324736/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 0ab1070..4814d2d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -25,14 +25,15 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 
 import com.google.common.collect.Lists;
 
-public class TestOutput extends AbstractLogicalOutput {
+public class TestOutput implements LogicalOutput {
   private static final Log LOG = LogFactory.getLog(TestOutput.class);
   
   public static OutputDescriptor getOutputDesc(byte[] payload) {
@@ -41,10 +42,14 @@ public class TestOutput extends AbstractLogicalOutput {
   }
   
   int output;
+  int numOutputs;
+  TezOutputContext outputContext;
   
   @Override
-  public List<Event> initialize() throws Exception {
-    getContext().requestInitialMemory(0l, null); //Mandatory call
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws Exception {
+    this.outputContext = outputContext;
+    this.outputContext.requestInitialMemory(0l, null); //Mandatory call
     return Collections.emptyList();
   }
   
@@ -69,12 +74,17 @@ public class TestOutput extends AbstractLogicalOutput {
   public List<Event> close() throws Exception {
     LOG.info("Sending data movement event with value: " + output);
     byte[] result = ByteBuffer.allocate(4).putInt(output).array();
-    List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());
-    for (int i = 0; i < getNumPhysicalOutputs(); i++) {
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+    for (int i = 0; i < numOutputs; i++) {
       DataMovementEvent event = new DataMovementEvent(i, result);
       events.add(event);
     }
     return events;
   }
 
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
+  }
+
 }


Mime
View raw message