tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1168. Add MultiMRInput, which can process multiple splits, and returns individual readers for each of these. (sseth)
Date Wed, 18 Jun 2014 19:55:10 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master b25c8668d -> 5bd8e4fd7


TEZ-1168. Add MultiMRInput, which can process multiple splits, and
returns individual readers for each of these. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5bd8e4fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5bd8e4fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5bd8e4fd

Branch: refs/heads/master
Commit: 5bd8e4fd789efe0b3aea1ac2c8da9c00d6f63d23
Parents: b25c866
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jun 18 12:53:06 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jun 18 12:53:06 2014 -0700

----------------------------------------------------------------------
 .../mapreduce/examples/IntersectExample.java    |   2 +-
 .../common/MRInputSplitDistributor.java         |   8 +-
 .../tez/mapreduce/hadoop/mapred/MRReporter.java |  23 +-
 .../mapreduce/TaskAttemptContextImpl.java       |  36 +-
 .../mapreduce/TaskInputOutputContextImpl.java   |   2 +-
 .../org/apache/tez/mapreduce/input/MRInput.java | 396 +++----------------
 .../tez/mapreduce/input/MRInputLegacy.java      |   8 +-
 .../tez/mapreduce/input/MultiMRInput.java       | 150 +++++++
 .../tez/mapreduce/input/base/MRInputBase.java   |  86 ++++
 .../apache/tez/mapreduce/lib/MRInputUtils.java  | 138 +++++++
 .../org/apache/tez/mapreduce/lib/MRReader.java  |  34 ++
 .../tez/mapreduce/lib/MRReaderMapReduce.java    | 149 +++++++
 .../tez/mapreduce/lib/MRReaderMapred.java       | 164 ++++++++
 .../apache/tez/mapreduce/output/MROutput.java   |   9 +-
 .../tez/mapreduce/processor/MRTaskReporter.java |   6 +-
 .../common/TestMRInputSplitDistributor.java     |   6 +-
 .../tez/mapreduce/input/TestMultiMRInput.java   | 291 ++++++++++++++
 .../processor/map/TestMapProcessor.java         |   2 +-
 .../processor/reduce/TestReduceProcessor.java   |   2 +-
 19 files changed, 1118 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 45885f2..a412df4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -88,7 +88,7 @@ public class IntersectExample extends Configured implements Tool {
   }
 
   private static void printUsage() {
-    System.err.println("Usage: " + "intersectlines <file1> <file2> <numPartitions> <outPath>");
+    System.err.println("Usage: " + "intersect <file1> <file2> <numPartitions> <outPath>");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index c8cca9d..0b5e345 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -95,12 +95,12 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
         diEvent = new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
       } else {
         if (useNewApi) {
-          org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInput
+          org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
               .getNewSplitDetailsFromEvent(mrSplit, conf);
           diEvent = new RootInputDataInformationEvent(count++, newInputSplit);
         } else {
-          org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInput.getOldSplitDetailsFromEvent(
-              mrSplit, conf);
+          org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
+              .getOldSplitDetailsFromEvent(mrSplit, conf);
           diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
index 11dcb97..28c8369 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
@@ -21,27 +21,22 @@ package org.apache.tez.mapreduce.hadoop.mapred;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.api.TezTaskContext;
 
 public class MRReporter implements Reporter {
 
-  private TezTaskContext tezTaskContext;
+  private TezCounters tezCounters;
   private InputSplit split;
   private float progress = 0f;
-  private boolean isProcessorContext = false;
   
-  public MRReporter(TezProcessorContext tezProcContext) {
-    this(tezProcContext, null);
-    isProcessorContext = true;
-  }
-  public MRReporter(TezTaskContext tezTaskContext) {
-    this(tezTaskContext, null);
+  
+  public MRReporter(TezCounters tezCounters) {
+    this(tezCounters, null);
   }
 
-  public MRReporter(TezTaskContext tezTaskContext, InputSplit split) {
-    this.tezTaskContext = tezTaskContext;
+  public MRReporter(TezCounters tezCounters, InputSplit split) {
+    this.tezCounters = tezCounters;
     this.split = split;
   }
   
@@ -58,12 +53,12 @@ public class MRReporter implements Reporter {
 
   @Override
   public Counter getCounter(Enum<?> name) {
-    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(name));
+    return Utils.getMRCounter(tezCounters.findCounter(name));
   }
 
   @Override
   public Counter getCounter(String group, String name) {
-    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(group,
+    return Utils.getMRCounter(tezCounters.findCounter(group,
         name));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index b3287b8..e5e7022 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.runtime.api.TezTaskContext;
 
 // NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
 // Meant for use by the "mapreduce" API
@@ -39,18 +39,16 @@ import org.apache.tez.runtime.api.TezTaskContext;
 public class TaskAttemptContextImpl
        extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
 
-  private final TezTaskContext taskContext;
+  private final TezCounters tezCounters;
   private final Reporter reporter;
   
   public static org.apache.hadoop.mapred.TaskAttemptID createMockTaskAttemptID(
-      TezTaskContext taskContext, boolean isMap) {
+      long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, boolean isMap) {
     return new org.apache.hadoop.mapred.TaskAttemptID(
-        new org.apache.hadoop.mapred.TaskID(String.valueOf(taskContext
-            .getApplicationId().getClusterTimestamp())
-            + String.valueOf(taskContext.getTaskVertexIndex()), taskContext
-            .getApplicationId().getId(),
-            isMap ? TaskType.MAP : TaskType.REDUCE, taskContext.getTaskIndex()),
-        taskContext.getTaskAttemptNumber());
+        new org.apache.hadoop.mapred.TaskID(String.valueOf(clusterId)
+            + String.valueOf(vertexIndex), appId,
+            isMap ? TaskType.MAP : TaskType.REDUCE, taskIndex),
+        taskAttemptNumber);
   }
   
   public static org.apache.hadoop.mapred.TaskAttemptID 
@@ -74,22 +72,24 @@ public class TaskAttemptContextImpl
   }
 
   // FIXME we need to use DAG Id but we are using App Id
-  public TaskAttemptContextImpl(Configuration conf, 
-      TezTaskContext taskContext, boolean isMap, Reporter reporter) {
+  public TaskAttemptContextImpl(Configuration conf, TezCounters tezCounters, long clusterId,
+      int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, boolean isMap,
+      Reporter reporter) {
     // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
-    this(conf, createMockTaskAttemptID(taskContext, isMap), taskContext, reporter);
+    this(conf, createMockTaskAttemptID(clusterId, vertexIndex, appId, taskIndex, taskAttemptNumber,
+        isMap), tezCounters, reporter);
   }
 
   //FIXME we need to use DAG Id but we are using App Id
    public TaskAttemptContextImpl(Configuration conf, TaskAttemptID attemptId, 
-       TezTaskContext taskContext, boolean isMap, Reporter reporter) {
+       TezCounters tezCounters, boolean isMap, Reporter reporter) {
      // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
-     this(conf, attemptId, taskContext, reporter);
+     this(conf, attemptId, tezCounters, reporter);
    }
  
-  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context, Reporter reporter) {
+  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezCounters tezCounters, Reporter reporter) {
     super(conf, taId);
-    this.taskContext = context;
+    this.tezCounters = tezCounters;
     this.reporter = reporter;
   }
 
@@ -101,12 +101,12 @@ public class TaskAttemptContextImpl
 
   @Override
   public Counter getCounter(Enum<?> counterName) {
-    return Utils.getMRCounter(taskContext.getCounters().findCounter(counterName));
+    return Utils.getMRCounter(tezCounters.findCounter(counterName));
   }
 
   @Override
   public Counter getCounter(String groupName, String counterName) {
-    return Utils.getMRCounter(taskContext.getCounters().findCounter(groupName, counterName));
+    return Utils.getMRCounter(tezCounters.findCounter(groupName, counterName));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
index 9d83435..44e30a8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
@@ -52,7 +52,7 @@ public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
                                     RecordWriter<KEYOUT,VALUEOUT> output,
                                     OutputCommitter committer,
                                     TezTaskContext context, Reporter reporter) {
-    super(conf, taskid, context, reporter);
+    super(conf, taskid, context.getCounters(), reporter);
     this.output = output;
     this.committer = committer;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 0eebfb4..e8fbf55 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -20,46 +20,28 @@ package org.apache.tez.mapreduce.input;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.input.base.MRInputBase;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
+import org.apache.tez.mapreduce.lib.MRReader;
+import org.apache.tez.mapreduce.lib.MRReaderMapReduce;
+import org.apache.tez.mapreduce.lib.MRReaderMapred;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
@@ -77,38 +59,21 @@ import com.google.common.base.Preconditions;
  * This class is not meant to be extended by external projects.
  */
 
-public class MRInput extends AbstractLogicalInput {
+public class MRInput extends MRInputBase {
 
   private static final Log LOG = LogFactory.getLog(MRInput.class);
   
-  private final Lock rrLock = new ReentrantLock();
-  private Condition rrInited = rrLock.newCondition();
+  private final ReentrantLock rrLock = new ReentrantLock();
+  private final Condition rrInited = rrLock.newCondition();
   
   private volatile boolean eventReceived = false;
-  
-  private JobConf jobConf;
-  private Configuration incrementalConf;
-  private boolean readerCreated = false;
-  
-  boolean useNewApi;
-  
-  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
 
-  @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
-  @SuppressWarnings("rawtypes")
-  protected org.apache.hadoop.mapreduce.RecordReader newRecordReader;
-  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+  private boolean readerCreated = false;
 
-  @SuppressWarnings("rawtypes")
-  private InputFormat oldInputFormat;
-  @SuppressWarnings("rawtypes")
-  protected RecordReader oldRecordReader;
-  protected InputSplit oldInputSplit;
+  protected MRReader mrReader;
 
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-  
-  private TezCounter inputRecordCounter;
+
   // Potential counters - #splits, #totalSize, #actualyBytesRead
   
   @Private
@@ -162,48 +127,19 @@ public class MRInput extends AbstractLogicalInput {
   
   @Override
   public List<Event> initialize() throws IOException {
-    getContext().requestInitialMemory(0l, null); //mandatory call
+    super.initialize();
     getContext().inputIsReady();
-    MRInputUserPayloadProto mrUserPayload =
-      MRHelpers.parseMRInputPayload(getContext().getUserPayload());
-    Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
-        "Split information not expected in MRInput");
-    Configuration conf =
-      MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
-    this.jobConf = new JobConf(conf);
-    // Add tokens to the jobConf - in case they are accessed within the RR / IF
-    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
-
-    TaskAttemptID taskAttemptId = new TaskAttemptID(
-      new TaskID(
-        Long.toString(getContext().getApplicationId().getClusterTimestamp()),
-        getContext().getApplicationId().getId(), TaskType.MAP,
-        getContext().getTaskIndex()),
-        getContext().getTaskAttemptNumber());
-
-    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
-      taskAttemptId.toString());
-    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        getContext().getDAGAttemptNumber());
-
-    // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
-    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
-    // processor. (The processor could provide the counter though)
-
-    this.inputRecordCounter = getContext().getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
-
-    useNewApi = this.jobConf.getUseNewMapper();
     this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
         MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
     LOG.info("Using New mapreduce API: " + useNewApi
         + ", split information via event: " + splitInfoViaEvents);
-
     initializeInternal();
     return null;
   }
 
   @Override
   public void start() {
+    Preconditions.checkState(getNumPhysicalInputs() == 1, "Expecting only 1 physical input for MRInput");
   }
 
   @Private
@@ -211,28 +147,34 @@ public class MRInput extends AbstractLogicalInput {
     // Primarily for visibility
     rrLock.lock();
     try {
+      
       if (splitInfoViaEvents) {
         if (useNewApi) {
-          setupNewInputFormat();
+          mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter,
+              getContext().getApplicationId().getClusterTimestamp(), getContext()
+                  .getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext()
+                  .getTaskIndex(), getContext().getTaskAttemptNumber());
         } else {
-          setupOldInputFormat();
+          mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter);
         }
       } else {
-        // Read split information.
-        TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
-        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext()
-            .getTaskIndex()];
-        this.splitMetaInfo = new TaskSplitIndex(
-            thisTaskMetaInfo.getSplitLocation(),
+        TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
+        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()];
+        TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
             thisTaskMetaInfo.getStartOffset());
         if (useNewApi) {
-          setupNewInputFormat();
-          newInputSplit = getNewSplitDetailsFromDisk(splitMetaInfo);
-          setupNewRecordReader();
+          org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
+              .getNewSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
+                  .findCounter(TaskCounter.SPLIT_RAW_BYTES));
+          mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(),
+              inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
+              getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
+              getContext().getTaskIndex(), getContext().getTaskAttemptNumber());
         } else {
-          setupOldInputFormat();
-          oldInputSplit = getOldSplitDetailsFromDisk(splitMetaInfo);
-          setupOldRecordReader();
+          org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
+              .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
+                  .findCounter(TaskCounter.SPLIT_RAW_BYTES));
+          mrReader = new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(), inputRecordCounter);
         }
       }
     } finally {
@@ -241,39 +183,6 @@ public class MRInput extends AbstractLogicalInput {
     LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName());
   }
 
-  private void setupOldInputFormat() {
-    oldInputFormat = this.jobConf.getInputFormat();
-  }
-  
-  private void setupOldRecordReader() throws IOException {
-    Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
-    oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-        this.jobConf, new MRReporter(getContext(), oldInputSplit));
-    setIncrementalConfigParams(oldInputSplit);
-  }
-  
-  private void setupNewInputFormat() throws IOException {
-    taskAttemptContext = createTaskAttemptContext();
-    Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
-    try {
-      inputFormatClazz = taskAttemptContext.getInputFormatClass();
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Unable to instantiate InputFormat class", e);
-    }
-
-    newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
-  }
-  
-  private void setupNewRecordReader() throws IOException {
-    Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");    
-    try {
-      newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
-      newRecordReader.initialize(newInputSplit, taskAttemptContext);
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while creating record reader", e);
-    }
-  }
-
   @Override
   public KeyValueReader getReader() throws IOException {
     Preconditions
@@ -282,15 +191,13 @@ public class MRInput extends AbstractLogicalInput {
     readerCreated = true;
     rrLock.lock();
     try {
-      if (newRecordReader == null && oldRecordReader == null)
+      if (!mrReader.isSetup())
         checkAndAwaitRecordReaderInitialization();
     } finally {
       rrLock.unlock();
     }
 
-    LOG.info("Creating reader for MRInput: "
-        + getContext().getSourceVertexName());
-    return new MRInputKVReader();
+    return mrReader;
   }
 
   @Override
@@ -312,11 +219,7 @@ public class MRInput extends AbstractLogicalInput {
 
   @Override
   public List<Event> close() throws IOException {
-    if (useNewApi) {
-      newRecordReader.close();
-    } else {
-      oldRecordReader.close();
-    }
+    mrReader.close();
     return null;
   }
 
@@ -328,27 +231,17 @@ public class MRInput extends AbstractLogicalInput {
    * @return the additional fields set by {@link MRInput}
    */
   public Configuration getConfigUpdates() {
-    if (incrementalConf != null) {
-      return new Configuration(incrementalConf);
+    if (!useNewApi) {
+      return ((MRReaderMapred) mrReader).getConfigUpdates();
+    } else {
+      return null;
     }
-    return null;
   }
-  
-  
 
   public float getProgress() throws IOException, InterruptedException {
-    if (useNewApi) {
-      return newRecordReader.getProgress();
-    } else {
-      return oldRecordReader.getProgress();
-    }
+    return mrReader.getProgress();
   }
 
-  
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, getContext(), true, null);
-  }
-  
   void processSplitEvent(RootInputDataInformationEvent event)
       throws IOException {
     rrLock.lock();
@@ -362,12 +255,16 @@ public class MRInput extends AbstractLogicalInput {
   }
   
   void checkAndAwaitRecordReaderInitialization() throws IOException {
+    assert rrLock.getHoldCount() == 1;
+    rrLock.lock();
     try {
       LOG.info("Awaiting RecordReader initialization");
       rrInited.await();
     } catch (Exception e) {
       throw new IOException(
           "Interrupted waiting for RecordReader initiailization");
+    } finally {
+      rrLock.unlock();
     }
   }
 
@@ -382,203 +279,22 @@ public class MRInput extends AbstractLogicalInput {
     }
   }
   
-  private void initFromEventInternal(RootInputDataInformationEvent initEvent)
-      throws IOException {
+  private void initFromEventInternal(RootInputDataInformationEvent initEvent) throws IOException {
     LOG.info("Initializing RecordReader from event");
     Preconditions.checkState(initEvent != null, "InitEvent must be specified");
-    MRSplitProto splitProto = MRSplitProto
-        .parseFrom(initEvent.getUserPayload());
+    MRSplitProto splitProto = MRSplitProto.parseFrom(initEvent.getUserPayload());
+    Object split = null;
     if (useNewApi) {
-      newInputSplit = getNewSplitDetailsFromEvent(splitProto, jobConf);
-      LOG.info("Split Details -> SplitClass: "
-          + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
-      setupNewRecordReader();
+      split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);
+      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
+          + split);
+
     } else {
-      oldInputSplit = getOldSplitDetailsFromEvent(splitProto, jobConf);
-      LOG.info("Split Details -> SplitClass: "
-          + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
-      setupOldRecordReader();
+      split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf);
+      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
+          + split);
     }
+    mrReader.setSplit(split);
     LOG.info("Initialized RecordReader from event");
   }
-
-  @Private
-  public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
-      throws IOException {
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
-  }
-  
-  @SuppressWarnings("unchecked")
-  private InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo)
-      throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    long offset = splitMetaInfo.getStartOffset();
-    
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapred.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapred.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  @Private
-  public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
-      MRSplitProto splitProto, Configuration conf) throws IOException {
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    return MRHelpers.createNewFormatSplitFromUserPayload(
-        splitProto, serializationFactory);
-  }
-  
-  @SuppressWarnings("unchecked")
-  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
-      TaskSplitIndex splitMetaInfo) throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    long offset = splitMetaInfo.getStartOffset();
-    
-    // Split information read from local filesystem.
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapreduce.InputSplit split = 
-        deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    getContext().getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  private void setIncrementalConfigParams(InputSplit inputSplit) {
-    if (inputSplit instanceof FileSplit) {
-      FileSplit fileSplit = (FileSplit) inputSplit;
-      this.incrementalConf = new Configuration(false);
-
-      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
-          .toString());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
-          fileSplit.getStart());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
-          fileSplit.getLength());
-    }
-    LOG.info("Processing split: " + inputSplit);
-  }
-
-  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
-      throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
-        FileSystem.getLocal(conf));
-    return allTaskSplitMetaInfo;
-  }
-  
-  private class MRInputKVReader implements KeyValueReader {
-    
-    Object key;
-    Object value;
-
-    private final boolean localNewApi;
-    
-    MRInputKVReader() {
-      localNewApi = useNewApi;
-      if (!localNewApi) {
-        key = oldRecordReader.createKey();
-        value =oldRecordReader.createValue();
-      }
-    }
-    
-    // Setup the values iterator once, and set value on the same object each time
-    // to prevent lots of objects being created.
-
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean next() throws IOException {
-      boolean hasNext = false;
-      if (localNewApi) {
-        try {
-          hasNext = newRecordReader.nextKeyValue();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while checking for next key-value", e);
-        }
-      } else {
-        hasNext = oldRecordReader.next(key, value);
-      }
-      if (hasNext) {
-        inputRecordCounter.increment(1);
-      }
-      
-      return hasNext;
-    }
-
-    @Override
-    public Object getCurrentKey() throws IOException {
-      if (localNewApi) {
-        try {
-          return newRecordReader.getCurrentKey();
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted while fetching next key", e);
-        }
-      } else {
-        return key;
-      }
-    }
-
-    @Override
-    public Object getCurrentValue() throws IOException {
-      if (localNewApi) {
-        try {
-          return newRecordReader.getCurrentValue();
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted while fetching next value", e);
-        }
-      } else {
-        return value;
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 7ecfd6e..9171492 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -48,18 +48,18 @@ public class MRInputLegacy extends MRInput {
   
   @Private
   public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
-    return this.newInputSplit;
+    return (org.apache.hadoop.mapreduce.InputSplit) mrReader.getSplit();
   }  
 
   @SuppressWarnings("rawtypes")
   @Unstable
   public org.apache.hadoop.mapreduce.RecordReader getNewRecordReader() {
-    return this.newRecordReader;
+    return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
   }
 
   @Private
   public InputSplit getOldInputSplit() {
-    return this.oldInputSplit;
+    return (InputSplit) mrReader.getSplit();
   }
 
   @Unstable
@@ -70,7 +70,7 @@ public class MRInputLegacy extends MRInput {
   @SuppressWarnings("rawtypes")
   @Private
   public RecordReader getOldRecordReader() {
-    return this.oldRecordReader;
+    return (RecordReader) mrReader.getRecordReader();
   }
   
   @LimitedPrivate("hive")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
new file mode 100644
index 0000000..8a759f8
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.mapreduce.input.base.MRInputBase;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
+import org.apache.tez.mapreduce.lib.MRReader;
+import org.apache.tez.mapreduce.lib.MRReaderMapReduce;
+import org.apache.tez.mapreduce.lib.MRReaderMapred;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class MultiMRInput extends MRInputBase {
+
+  private static final Log LOG = LogFactory.getLog(MultiMRInput.class);
+
+  @Override
+  public int getNumPhysicalInputs() {
+    return super.getNumPhysicalInputs();
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition condition = lock.newCondition();
+  private final AtomicInteger eventCount = new AtomicInteger(0);
+
+  private List<MRReader> readers = new LinkedList<MRReader>();
+
+  @Override
+  public List<Event> initialize() throws IOException {
+    super.initialize();
+    LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: "
+        + getNumPhysicalInputs());
+    return null;
+  }
+
+  public Collection<KeyValueReader> getKeyValueReaders() throws InterruptedException, IOException {
+    lock.lock();
+    try {
+      while (eventCount.get() != getNumPhysicalInputs()) {
+        condition.await();
+      }
+    } finally {
+      lock.unlock();
+    }
+    return Collections
+        .unmodifiableCollection(Lists.transform(readers, new Function<MRReader, KeyValueReader>() {
+          @Override
+          public KeyValueReader apply(MRReader input) {
+            return input;
+          }
+        }));
+  }
+
+  @Override
+  public Reader getReader() throws Exception {
+    throw new UnsupportedOperationException("getReader not supported. use getKeyValueReaders");
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws Exception {
+    lock.lock();
+    try {
+      Preconditions.checkState(eventCount.get() + inputEvents.size() <= getNumPhysicalInputs(),
+          "Unexpected event. All physical sources already initialized");
+      for (Event event : inputEvents) {
+        MRReader reader = initFromEvent((RootInputDataInformationEvent) event);
+        readers.add(reader);
+        if (eventCount.incrementAndGet() == getNumPhysicalInputs()) {
+          getContext().inputIsReady();
+          condition.signal();
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private MRReader initFromEvent(RootInputDataInformationEvent event) throws IOException {
+    Preconditions.checkState(event != null, "Event must be specified");
+    LOG.info("Initializing Reader: " + eventCount.get());
+    MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+    Object split = null;
+    MRReader reader = null;
+    JobConf localJobConf = new JobConf(jobConf);
+    if (useNewApi) {
+      split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf);
+      reader = new MRReaderMapReduce(localJobConf, (org.apache.hadoop.mapreduce.InputSplit) split,
+          getContext().getCounters(), inputRecordCounter, getContext().getApplicationId()
+          .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
+          .getApplicationId().getId(), getContext().getTaskIndex(), getContext()
+          .getTaskAttemptNumber());
+      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
+          + split);
+
+    } else {
+      split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
+      reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
+          getContext().getCounters(), inputRecordCounter);
+      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
+          + split);
+    }
+    LOG.info("Initialized RecordReader from event");
+    return reader;
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    for (MRReader reader : readers) {
+      reader.close();
+    }
+    return null;
+  }
+
+  @Override
+  public void start() throws Exception {
+    Preconditions.checkState(getNumPhysicalInputs() >= 1, "Expecting one or more physical inputs");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
new file mode 100644
index 0000000..a6a0d83
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input.base;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Reader;
+
+import java.io.IOException;
+import java.util.List;
+
+
+@InterfaceAudience.Private
+public abstract class MRInputBase extends AbstractLogicalInput {
+
+  protected JobConf jobConf;
+  protected TezCounter inputRecordCounter;
+
+  @Override
+  public Reader getReader() throws Exception {
+    return null;
+  }
+
+  @InterfaceAudience.Private
+  protected boolean useNewApi;
+
+  public List<Event> initialize() throws IOException {
+    getContext().requestInitialMemory(0l, null); // mandatory call
+    MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
+        MRHelpers.parseMRInputPayload(getContext().getUserPayload());
+    Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
+        "Split information not expected in " + this.getClass().getName());
+    Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
+
+    this.jobConf = new JobConf(conf);
+    // Add tokens to the jobConf - in case they are accessed within the RR / IF
+    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
+
+    TaskAttemptID taskAttemptId = new TaskAttemptID(
+        new TaskID(
+            Long.toString(getContext().getApplicationId().getClusterTimestamp()),
+            getContext().getApplicationId().getId(), TaskType.MAP,
+            getContext().getTaskIndex()),
+        getContext().getTaskAttemptNumber());
+
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
+        taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        getContext().getDAGAttemptNumber());
+
+    this.inputRecordCounter = getContext().getCounters().findCounter(
+        TaskCounter.INPUT_RECORDS_PROCESSED);
+
+    useNewApi = this.jobConf.getUseNewMapper();
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
new file mode 100644
index 0000000..c2e33e1
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+
+/**
+ * Helper methods for InputFormat based Inputs. Private to Tez.
+ */
+@Private
+public class MRInputUtils {
+
+  private static final Log LOG = LogFactory.getLog(MRInputUtils.class);
+
+  public static TaskSplitMetaInfo[] readSplits(Configuration conf) throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+    allTaskSplitMetaInfo = SplitMetaInfoReaderTez
+        .readSplitMetaInfo(conf, FileSystem.getLocal(conf));
+    return allTaskSplitMetaInfo;
+  }
+
+  public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
+      MRSplitProto splitProto, Configuration conf) throws IOException {
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    return MRHelpers.createNewFormatSplitFromUserPayload(
+        splitProto, serializationFactory);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
+      TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter)
+      throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    long offset = splitMetaInfo.getStartOffset();
+
+    // Split information read from local filesystem.
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+    try {
+      cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
+        .getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    if (splitBytesCounter != null) {
+      splitBytesCounter.increment(pos - offset);
+    }
+    inFile.close();
+    return split;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo,
+      JobConf jobConf, TezCounter splitBytesCounter) throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    long offset = splitMetaInfo.getStartOffset();
+
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapred.InputSplit> cls;
+    try {
+      cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
+        .getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    if (splitBytesCounter != null) {
+      splitBytesCounter.increment(pos - offset);
+    }
+    inFile.close();
+    return split;
+  }
+  
+  @Private
+  public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
+      throws IOException {
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
new file mode 100644
index 0000000..f8a5a5e
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@Private
+public interface MRReader extends KeyValueReader {
+  void setSplit(Object split) throws IOException;
+  boolean isSetup();
+  float getProgress() throws IOException, InterruptedException;
+  void close() throws IOException;
+  Object getSplit();
+  Object getRecordReader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
new file mode 100644
index 0000000..076f801
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import com.google.common.base.Preconditions;
+
+public class MRReaderMapReduce implements MRReader {
+
+  private final TezCounter inputRecordCounter;
+
+  private final TaskAttemptContext taskAttemptContext;
+  @SuppressWarnings("rawtypes")
+  private final InputFormat inputFormat;
+  @SuppressWarnings("rawtypes")
+  private RecordReader recordReader;
+  private InputSplit inputSplit;
+
+  private boolean setupComplete = false;
+
+  public MRReaderMapReduce(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
+      long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber)
+      throws IOException {
+    this(jobConf, null, tezCounters, inputRecordCounter, clusterId, vertexIndex, appId, taskIndex,
+        taskAttemptNumber);
+  }
+
+  public MRReaderMapReduce(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
+      TezCounter inputRecordCounter, long clusterId, int vertexIndex, int appId, int taskIndex,
+      int taskAttemptNumber) throws IOException {
+    this.inputRecordCounter = inputRecordCounter;
+    this.taskAttemptContext = new TaskAttemptContextImpl(jobConf, tezCounters, clusterId,
+        vertexIndex, appId, taskIndex, taskAttemptNumber, true, null);
+
+    Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+    try {
+      inputFormatClazz = taskAttemptContext.getInputFormatClass();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to instantiate InputFormat class", e);
+    }
+    inputFormat = ReflectionUtils.newInstance(inputFormatClazz, jobConf);
+
+    if (inputSplit != null) {
+      this.inputSplit = inputSplit;
+      setupNewRecordReader();
+    }
+  }
+
+  @Override
+  public void setSplit(Object inputSplit) throws IOException {
+    this.inputSplit = (InputSplit) inputSplit;
+    setupNewRecordReader();
+  }
+
+  public boolean isSetup() {
+    return setupComplete;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return setupComplete ? recordReader.getProgress() : 0.0f;
+  }
+
+  public void close() throws IOException {
+    if (setupComplete) {
+      recordReader.close();
+    }
+  }
+
+  @Override
+  public Object getSplit() {
+    return inputSplit;
+  }
+
+  @Override
+  public Object getRecordReader() {
+    return recordReader;
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    boolean hasNext;
+    try {
+      hasNext = recordReader.nextKeyValue();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while checking for next key-value", e);
+    }
+    if (hasNext) {
+      inputRecordCounter.increment(1);
+    }
+    return hasNext;
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    try {
+      return recordReader.getCurrentKey();
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while fetching next key", e);
+    }
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    try {
+      return recordReader.getCurrentValue();
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while fetching next value", e);
+    }
+  }
+
+  private void setupNewRecordReader() throws IOException {
+    Preconditions.checkNotNull(inputSplit, "Input split hasn't yet been setup");
+    try {
+      recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+      recordReader.initialize(inputSplit, taskAttemptContext);
+      setupComplete = true;
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while creating record reader", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
new file mode 100644
index 0000000..e2800ce
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.input.MRInput;
+
+import com.google.common.base.Preconditions;
+
+public class MRReaderMapred implements MRReader {
+
+  private static final Log LOG = LogFactory.getLog(MRReaderMapred.class);
+
+  Object key;
+  Object value;
+
+  private final JobConf jobConf;
+  private final TezCounters tezCounters;
+  private final TezCounter inputRecordCounter;
+
+  @SuppressWarnings("rawtypes")
+  private final InputFormat inputFormat;
+  protected InputSplit inputSplit;
+  @SuppressWarnings("rawtypes")
+  protected RecordReader recordReader;
+  private Configuration incrementalConf;
+
+  private boolean setupComplete = false;
+
+  public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter)
+      throws IOException {
+    this(jobConf, null, tezCounters, inputRecordCounter);
+  }
+
+  public MRReaderMapred(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
+      TezCounter inputRecordCounter) throws IOException {
+    this.jobConf = jobConf;
+    this.tezCounters = tezCounters;
+    this.inputRecordCounter = inputRecordCounter;
+    inputFormat = this.jobConf.getInputFormat();
+    if (inputSplit != null) {
+      this.inputSplit = inputSplit;
+      setupOldRecordReader();
+    }
+  }
+
+  @Override
+  public void setSplit(Object inputSplit) throws IOException {
+    this.inputSplit = (InputSplit) inputSplit;
+    setupOldRecordReader();
+  }
+
+  @Override
+  public boolean isSetup() {
+    return setupComplete;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return setupComplete ? recordReader.getProgress() : 0.0f;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (setupComplete) {
+      recordReader.close();
+    }
+  }
+
+  @Override
+  public Object getSplit() {
+    return inputSplit;
+  }
+
+  @Override
+  public Object getRecordReader() {
+    return recordReader;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean next() throws IOException {
+    boolean hasNext = recordReader.next(key, value);
+    if (hasNext) {
+      inputRecordCounter.increment(1);
+    }
+    return hasNext;
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return key;
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    return value;
+  }
+
+  /**
+   * {@link MRInput} sets some additional parameters like split location when using the new API.
+   * This methods returns the list of additional updates, and should be used by Processors using the
+   * old MapReduce API with {@link MRInput}.
+   * 
+   * @return the additional fields set by {@link MRInput}
+   */
+  public Configuration getConfigUpdates() {
+    if (incrementalConf != null) {
+      return new Configuration(incrementalConf);
+    }
+    return null;
+  }
+
+  private void setupOldRecordReader() throws IOException {
+    Preconditions.checkNotNull(inputSplit, "Input split hasn't yet been setup");
+    recordReader = inputFormat.getRecordReader(inputSplit, this.jobConf, new MRReporter(
+        tezCounters, inputSplit));
+    setIncrementalConfigParams(inputSplit);
+    key = recordReader.createKey();
+    value = recordReader.createValue();
+    setupComplete = true;
+  }
+
+  private void setIncrementalConfigParams(InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      this.incrementalConf = new Configuration(false);
+
+      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+    }
+    LOG.info("Processing split: " + inputSplit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 247fcb9..987ca09 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -120,7 +119,9 @@ public class MROutput extends AbstractLogicalOutput {
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         getContext().getDAGAttemptNumber());
     TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
-        .createMockTaskAttemptID(getContext(), isMapperOutput);
+        .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
+            getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
+            getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput);
     jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
     jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
     jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
@@ -165,7 +166,7 @@ public class MROutput extends AbstractLogicalOutput {
 
       oldRecordWriter =
           oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(getContext()));
+              fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
     }
     initCommitter(jobConf, useNewApi);
 
@@ -218,7 +219,7 @@ public class MROutput extends AbstractLogicalOutput {
   }
 
   private TaskAttemptContext createTaskAttemptContext(TaskAttemptID attemptId) {
-    return new TaskAttemptContextImpl(this.jobConf, attemptId, getContext(),
+    return new TaskAttemptContextImpl(this.jobConf, attemptId, getContext().getCounters(),
         isMapperOutput, null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 0869e32..c3f7bc9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -45,19 +45,19 @@ public class MRTaskReporter
 
   public MRTaskReporter(TezProcessorContext context) {
     this.context = context;
-    this.reporter = new MRReporter(context);
+    this.reporter = new MRReporter(context.getCounters());
     this.isProcessorContext = true;
   }
 
   public MRTaskReporter(TezOutputContext context) {
     this.context = context;
-    this.reporter = new MRReporter(context);
+    this.reporter = new MRReporter(context.getCounters());
     this.isProcessorContext = false;
   }
   
   public MRTaskReporter(TezInputContext context) {
     this.context= context;
-    this.reporter = new MRReporter(context);
+    this.reporter = new MRReporter(context.getCounters());
     this.isProcessorContext = false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index da13561..feabefd 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -87,12 +87,12 @@ public class TestMRInputSplitDistributor {
     assertNotNull(diEvent2.getUserPayload());
 
     MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload());
-    InputSplit is1 = MRInput.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
+    InputSplit is1 = MRInputUtils.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
     assertTrue(is1 instanceof InputSplitForTest);
     assertEquals(1, ((InputSplitForTest) is1).identifier);
 
     MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload());
-    InputSplit is2 = MRInput.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
+    InputSplit is2 = MRInputUtils.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
     assertTrue(is2 instanceof InputSplitForTest);
     assertEquals(2, ((InputSplitForTest) is2).identifier);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
new file mode 100644
index 0000000..3aa5ddc
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMultiMRInput {
+
+  private static final Log LOG = LogFactory.getLog(TestMultiMRInput.class);
+
+  private static final JobConf defaultConf = new JobConf();
+  private static final String testTmpDir;
+  private static final Path TEST_ROOT_DIR;
+  private static FileSystem localFs;
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      testTmpDir = System.getProperty("test.build.data", "/tmp");
+      TEST_ROOT_DIR = new Path(testTmpDir, TestMultiMRInput.class.getSimpleName());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+
+  @Test(timeout = 5000)
+  public void testSingleSplit() throws Exception {
+
+    Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
+    JobConf jobConf = new JobConf(defaultConf);
+    jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+    builder.setInputFormatName(SequenceFileInputFormat.class.getName());
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
+    byte[] payload = builder.build().toByteArray();
+
+    TezInputContext inputContext = createTezInputContext(payload);
+
+    MultiMRInput input = new MultiMRInput();
+    input.setNumPhysicalInputs(1);
+    input.initialize(inputContext);
+    List<Event> eventList = new ArrayList<Event>();
+
+    String file1 = "file1";
+    LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
+        10);
+    SequenceFileInputFormat<LongWritable, Text> format =
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(jobConf, 1);
+    assertEquals(1, splits.length);
+
+    MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
+    RootInputDataInformationEvent event = new RootInputDataInformationEvent(0,
+        splitProto.toByteArray());
+
+    eventList.clear();
+    eventList.add(event);
+    input.handleEvents(eventList);
+
+    int readerCount = 0;
+    for (KeyValueReader reader : input.getKeyValueReaders()) {
+      readerCount++;
+      while (reader.next()) {
+        if (data1.size() == 0) {
+          fail("Found more records than expected");
+        }
+        Object key = reader.getCurrentKey();
+        Object val = reader.getCurrentValue();
+        assertEquals(val, data1.remove(key));
+      }
+    }
+    assertEquals(1, readerCount);
+  }
+
+  @Test(timeout = 5000)
+  public void testMultipleSplits() throws Exception {
+
+    Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
+    JobConf jobConf = new JobConf(defaultConf);
+    jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+    builder.setInputFormatName(SequenceFileInputFormat.class.getName());
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
+    byte[] payload = builder.build().toByteArray();
+
+    TezInputContext inputContext = createTezInputContext(payload);
+
+    MultiMRInput input = new MultiMRInput();
+    input.setNumPhysicalInputs(2);
+    input.initialize(inputContext);
+    List<Event> eventList = new ArrayList<Event>();
+
+    LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
+
+    String file1 = "file1";
+    LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
+        10);
+
+    String file2 = "file2";
+    LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
+        20);
+
+    data.putAll(data1);
+    data.putAll(data2);
+
+    SequenceFileInputFormat<LongWritable, Text> format =
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(jobConf, 2);
+    assertEquals(2, splits.length);
+
+    MRSplitProto splitProto1 = MRHelpers.createSplitProto(splits[0]);
+    RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
+        splitProto1.toByteArray());
+
+    MRSplitProto splitProto2 = MRHelpers.createSplitProto(splits[1]);
+    RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(0,
+        splitProto2.toByteArray());
+
+    eventList.clear();
+    eventList.add(event1);
+    eventList.add(event2);
+    input.handleEvents(eventList);
+
+    int readerCount = 0;
+    for (KeyValueReader reader : input.getKeyValueReaders()) {
+      readerCount++;
+      while (reader.next()) {
+        if (data.size() == 0) {
+          fail("Found more records than expected");
+        }
+        Object key = reader.getCurrentKey();
+        Object val = reader.getCurrentValue();
+        assertEquals(val, data.remove(key));
+      }
+    }
+    assertEquals(2, readerCount);
+  }
+
+  @Test(timeout = 5000)
+  public void testExtraEvents() throws Exception {
+    Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
+    JobConf jobConf = new JobConf(defaultConf);
+    jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+    builder.setInputFormatName(SequenceFileInputFormat.class.getName());
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
+    byte[] payload = builder.build().toByteArray();
+
+    TezInputContext inputContext = createTezInputContext(payload);
+
+    MultiMRInput input = new MultiMRInput();
+    input.setNumPhysicalInputs(1);
+    input.initialize(inputContext);
+    List<Event> eventList = new ArrayList<Event>();
+
+    String file1 = "file1";
+    createInputData(localFs, workDir, jobConf, file1, 0, 10);
+    SequenceFileInputFormat<LongWritable, Text> format =
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(jobConf, 1);
+    assertEquals(1, splits.length);
+
+    MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
+    RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
+        splitProto.toByteArray());
+    RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(1,
+        splitProto.toByteArray());
+
+    eventList.clear();
+    eventList.add(event1);
+    eventList.add(event2);
+    try {
+      input.handleEvents(eventList);
+      fail("Expecting Exception due to too many events");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Unexpected event. All physical sources already initialized"));
+    }
+  }
+
+  private TezInputContext createTezInputContext(byte[] payload) {
+    ApplicationId applicationId = ApplicationId.newInstance(10000, 1);
+    TezCounters counters = new TezCounters();
+
+    TezInputContext inputContext = mock(TezInputContext.class);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn(counters).when(inputContext).getCounters();
+    doReturn(1).when(inputContext).getDAGAttemptNumber();
+    doReturn("dagName").when(inputContext).getDAGName();
+    doReturn(1).when(inputContext).getInputIndex();
+    doReturn("srcVertexName").when(inputContext).getSourceVertexName();
+    doReturn(1).when(inputContext).getTaskAttemptNumber();
+    doReturn(1).when(inputContext).getTaskIndex();
+    doReturn(1).when(inputContext).getTaskVertexIndex();
+    doReturn("taskVertexName").when(inputContext).getTaskVertexName();
+    doReturn(payload).when(inputContext).getUserPayload();
+    return inputContext;
+  }
+
+  public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path workDir,
+                                                                  JobConf job, String filename,
+                                                                  long startKey,
+                                                                  long numKeys) throws IOException {
+    LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
+    Path file = new Path(workDir, filename);
+    LOG.info("Generating data at path: " + file);
+    // create a file with length entries
+    @SuppressWarnings("deprecation")
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, file, LongWritable.class,
+        Text.class);
+    try {
+      Random r = new Random(System.currentTimeMillis());
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (long i = startKey; i < numKeys; i++) {
+        key.set(i);
+        value.set(Integer.toString(r.nextInt(10000)));
+        data.put(new LongWritable(key.get()), new Text(value.toString()));
+        writer.append(key, value);
+        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+      }
+    } finally {
+      writer.close();
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index a8aec1f..ad56178 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -126,7 +126,7 @@ public class TestMapProcessor {
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(job, null)),
-        0);
+        1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5bd8e4fd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index bbe47d9..77e563a 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -138,7 +138,7 @@ public class TestReduceProcessor {
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(mapConf, null)),
-        0);
+        1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,


Mime
View raw message