tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-624. Fix MROutput to support multiple outputs to the same output location (bikas)
Date Tue, 21 Jan 2014 00:34:25 GMT
Updated Branches:
  refs/heads/master 8fb1987c0 -> dec9181a9


TEZ-624. Fix MROutput to support multiple outputs to the same output location (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/dec9181a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/dec9181a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/dec9181a

Branch: refs/heads/master
Commit: dec9181a95da2139d65013256fdb547b9758dd36
Parents: 8fb1987
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Jan 20 16:34:13 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Jan 20 16:34:13 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/runtime/api/TezInputContext.java |  7 +++
 .../tez/runtime/api/TezOutputContext.java       |  7 +++
 .../apache/tez/runtime/api/TezTaskContext.java  |  9 +++-
 .../tez/mapreduce/examples/WordCount.java       |  2 +-
 .../apache/tez/mapreduce/client/YARNRunner.java | 10 ++++
 .../tez/mapreduce/hadoop/IDConverter.java       |  5 ++
 .../tez/mapreduce/hadoop/MRJobConfig.java       |  3 ++
 .../mapreduce/TaskAttemptContextImpl.java       | 52 +++++++++++++++++---
 .../apache/tez/mapreduce/output/MROutput.java   | 50 ++++++++++++-------
 .../org/apache/tez/mapreduce/TezTestUtils.java  | 12 -----
 .../processor/reduce/TestReduceProcessor.java   |  7 +--
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 28 +++++++----
 .../runtime/api/impl/TezInputContextImpl.java   |  9 +++-
 .../runtime/api/impl/TezOutputContextImpl.java  |  9 +++-
 .../runtime/api/impl/TezTaskContextImpl.java    |  4 ++
 .../output/TestOnFileUnorderedKVOutput.java     |  2 +-
 16 files changed, 160 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
index b07e92c..79731b5 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
@@ -28,5 +28,12 @@ public interface TezInputContext extends TezTaskContext {
    * @return Name of the Source Vertex
    */
   public String getSourceVertexName();
+  
+  /**
+   * Get the index of the input in the set of all inputs for the task. The 
+   * index will be consistent and valid only among the tasks of this vertex.
+   * @return index
+   */
+  public int getInputIndex();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-api/src/main/java/org/apache/tez/runtime/api/TezOutputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezOutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezOutputContext.java
index fda30ca..31511d4 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezOutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezOutputContext.java
@@ -29,5 +29,12 @@ public interface TezOutputContext extends TezTaskContext {
    * @return Name of the Destination Vertex
    */
   public String getDestinationVertexName();
+  
+  /**
+   * Get the index of the output in the set of all outputs for the task. The 
+   * index will be consistent and valid only among the tasks of this vertex.
+   * @return index
+   */
+  public int getOutputIndex();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
index ef652b6..4221cde 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
@@ -48,7 +48,7 @@ public interface TezTaskContext {
   public int getDAGAttemptNumber();
 
   /**
-   * Get the index of this Task
+   * Get the index of this Task among the tasks of this vertex
    * @return Task Index
    */
   public int getTaskIndex();
@@ -70,6 +70,13 @@ public interface TezTaskContext {
    * @return Vertex Name
    */
   public String getTaskVertexName();
+  
+  /**
+   * Get the index of this task's vertex in the set of vertices in the DAG. This 
+   * is consistent and valid across all tasks/vertices in the same DAG.
+   * @return index
+   */
+  public int getTaskVertexIndex();
 
   public TezCounters getCounters();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 7646c65..523514d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -192,7 +192,7 @@ public class WordCount {
     finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
         TextOutputFormat.class.getName());
     finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
-    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
+    finalReduceConf.setBoolean("mapred.mapper.new-api", false);
 
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
         mapStageConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index bca96cf..90a4342 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -401,6 +401,16 @@ public class YARNRunner implements ClientProtocol {
 
     Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
         : MRHelpers.getReduceResource(stageConf);
+    
+    JobConf jobConf = new JobConf(stageConf);
+    String outputFilePrefix = null;
+    if (jobConf.getUseNewMapper()) {
+      outputFilePrefix = "part";
+    } else {
+      outputFilePrefix = isMap ? "part-m" : "part-r";
+    }
+    stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, outputFilePrefix);
+    
     byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
     Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
         setUserPayload(vertexUserPayload),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
index eabe007..a528098 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
@@ -43,6 +43,11 @@ public class IDConverter {
         taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
         taskid.getId());
   }
+  
+  public static TaskID toMRTaskIdForOutput(TezTaskID taskid) {
+    return org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
+        .createMockTaskAttemptIDFromTezTaskId(taskid, (taskid.getVertexID().getId() == 0));
+  }
 
   public static TaskAttemptID toMRTaskAttemptId(
       TezTaskAttemptID taskAttemptId) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 285f94e..7a9cafb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -637,6 +637,9 @@ public interface MRJobConfig {
   @InterfaceAudience.Private
   static final String HS_DELEGATION_TOKEN_REQUIRED
       = "mapreduce.history.server.delegationtoken.required";
+  
+  public static final String MROUTPUT_FILE_NAME_PREFIX
+      = MR_TEZ_PREFIX + "mroutput.file-name.prefix";
 
   // MRR related config properties
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index 2f6f90d..b3287b8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -23,8 +23,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.mapreduce.common.Utils;
 import org.apache.tez.runtime.api.TezTaskContext;
 
@@ -38,19 +41,52 @@ public class TaskAttemptContextImpl
 
   private final TezTaskContext taskContext;
   private final Reporter reporter;
+  
+  public static org.apache.hadoop.mapred.TaskAttemptID createMockTaskAttemptID(
+      TezTaskContext taskContext, boolean isMap) {
+    return new org.apache.hadoop.mapred.TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(String.valueOf(taskContext
+            .getApplicationId().getClusterTimestamp())
+            + String.valueOf(taskContext.getTaskVertexIndex()), taskContext
+            .getApplicationId().getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE, taskContext.getTaskIndex()),
+        taskContext.getTaskAttemptNumber());
+  }
+  
+  public static org.apache.hadoop.mapred.TaskAttemptID 
+    createMockTaskAttemptIDFromTezTaskAttemptId(TezTaskAttemptID tezTaId, boolean isMap)
{
+    TezVertexID vId = tezTaId.getTaskID().getVertexID();
+    ApplicationId appId = vId.getDAGId().getApplicationId();
+    return new org.apache.hadoop.mapred.TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp())
+            + String.valueOf(vId.getId()), appId.getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE, tezTaId.getTaskID().getId()),
+        tezTaId.getId());
+  }
+  
+  public static org.apache.hadoop.mapred.TaskID 
+    createMockTaskAttemptIDFromTezTaskId(TezTaskID tezTaId, boolean isMap) {
+    TezVertexID vId = tezTaId.getVertexID();
+    ApplicationId appId = vId.getDAGId().getApplicationId();
+    return new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp())
+            + String.valueOf(vId.getId()), appId.getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE, tezTaId.getId());
+  }
 
   // FIXME we need to use DAG Id but we are using App Id
-  public TaskAttemptContextImpl(Configuration conf,
+  public TaskAttemptContextImpl(Configuration conf, 
       TezTaskContext taskContext, boolean isMap, Reporter reporter) {
     // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
-    this(conf, new TaskAttemptID(
-        new TaskID(String.valueOf(taskContext.getApplicationId()
-            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
-            isMap ? TaskType.MAP : TaskType.REDUCE,
-            taskContext.getTaskIndex()),
-            taskContext.getTaskAttemptNumber()), taskContext, reporter);
+    this(conf, createMockTaskAttemptID(taskContext, isMap), taskContext, reporter);
   }
 
+  //FIXME we need to use DAG Id but we are using App Id
+   public TaskAttemptContextImpl(Configuration conf, TaskAttemptID attemptId, 
+       TezTaskContext taskContext, boolean isMap, Reporter reporter) {
+     // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
+     this(conf, attemptId, taskContext, reporter);
+   }
+ 
   public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context,
Reporter reporter) {
     super(conf, taId);
     this.taskContext = context;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index b554b6c..605efc8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -34,11 +34,9 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
@@ -59,12 +57,9 @@ public class MROutput implements LogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(MROutput.class);
 
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
+  private final NumberFormat taskNumberFormat = NumberFormat.getInstance();
+  private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance();
+  
   private TezOutputContext outputContext;
   private JobConf jobConf;
   boolean useNewApi;
@@ -95,6 +90,10 @@ public class MROutput implements LogicalOutput {
   public List<Event> initialize(TezOutputContext outputContext)
       throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");
+    taskNumberFormat.setMinimumIntegerDigits(5);
+    taskNumberFormat.setGroupingUsed(false);
+    nonTaskNumberFormat.setMinimumIntegerDigits(3);
+    nonTaskNumberFormat.setGroupingUsed(false);
     this.outputContext = outputContext;
     Configuration conf = TezUtils.createConfFromUserPayload(
         outputContext.getUserPayload());
@@ -104,18 +103,21 @@ public class MROutput implements LogicalOutput {
         false);
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         outputContext.getDAGAttemptNumber());
-    TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
-      outputContext.getApplicationId().getClusterTimestamp()),
-      outputContext.getApplicationId().getId(),
-      (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
-      outputContext.getTaskIndex()),
-      outputContext.getTaskAttemptNumber());
+    TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
+        .createMockTaskAttemptID(outputContext, isMapperOutput);
     jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
     jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
     jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
     jobConf.setInt(JobContext.TASK_PARTITION,
       taskAttemptId.getTaskID().getId());
     jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+    
+    if (useNewApi) {
+      // set the output part name to have a unique prefix
+      if (jobConf.get("mapreduce.output.basename") == null) {
+        jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix());
+      }
+    }
 
     outputRecordCounter = outputContext.getCounters().findCounter(
         TaskCounter.MAP_OUTPUT_RECORDS);
@@ -123,7 +125,7 @@ public class MROutput implements LogicalOutput {
         FileOutputFormatCounter.BYTES_WRITTEN);
 
     if (useNewApi) {
-      newApiTaskAttemptContext = createTaskAttemptContext();
+      newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
       try {
         newOutputFormat =
             ReflectionUtils.newInstance(
@@ -233,8 +235,8 @@ public class MROutput implements LogicalOutput {
     }
   }
 
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, outputContext,
+  private TaskAttemptContext createTaskAttemptContext(TaskAttemptID attemptId) {
+    return new TaskAttemptContextImpl(this.jobConf, attemptId, outputContext,
         isMapperOutput, null);
   }
 
@@ -246,9 +248,21 @@ public class MROutput implements LogicalOutput {
     }
     return bytesWritten;
   }
+  
+  private String getOutputFileNamePrefix() {
+    String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
+    if (prefix == null) {
+      prefix = "part-v" + 
+          nonTaskNumberFormat.format(outputContext.getTaskVertexIndex()) +  
+          "-o" + nonTaskNumberFormat.format(outputContext.getOutputIndex());
+    }
+    return prefix;
+  }
 
   private String getOutputName() {
-    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+    // give a unique prefix to the output name
+    return getOutputFileNamePrefix() + 
+        "-" + taskNumberFormat.format(outputContext.getTaskIndex());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
index 10f2024..8a8b141 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
@@ -45,16 +45,4 @@ public class TezTestUtils {
             jobId), vertexId),
             taskId);
   }
-  
-  public static TezDAGID getMockJobId(int jobId) {
-    return TezDAGID.getInstance(
-        ApplicationId.newInstance(0, jobId), jobId);
-  }
-  
-  public static TezVertexID getMockVertexId(int jobId, int vId) {
-    return TezVertexID.getInstance(
-        TezDAGID.getInstance(
-            ApplicationId.newInstance(0, jobId), jobId),
-            vId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 7462fba..abefcce 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -196,9 +196,10 @@ public class TestReduceProcessor {
     // Only a task commit happens, hence the data is still in the temporary directory.
     Path reduceOutputDir = new Path(new Path(workDir, "output"),
         "_temporary/0/" + IDConverter
-            .toMRTaskId(TezTestUtils.getMockTaskId(0, 1, 0)));
-    Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
-
+            .toMRTaskIdForOutput(TezTestUtils.getMockTaskId(0, 1, 0)));
+    
+    Path reduceOutputFile = new Path(reduceOutputDir, "part-v001-o000-00000");
+    
     SequenceFile.Reader reader = new SequenceFile.Reader(localFs,
         reduceOutputFile, reduceConf);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 979cda1..a624778 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -143,13 +143,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     
     int numTasks = 0;
     
+    int inputIndex = 0;
     for (InputSpec inputSpec : taskSpec.getInputs()) {
-      this.initializerCompletionService.submit(new InitializeInputCallable(inputSpec));
+      this.initializerCompletionService.submit(
+          new InitializeInputCallable(inputSpec, inputIndex++));
       numTasks++;
     }
     
+    int outputIndex = 0;
     for (OutputSpec outputSpec : taskSpec.getOutputs()) {
-      this.initializerCompletionService.submit(new InitializeOutputCallable(outputSpec));
+      this.initializerCompletionService.submit(
+          new InitializeOutputCallable(outputSpec, outputIndex++));
       numTasks++;
     }
     // Shutdown after all tasks complete.
@@ -238,9 +242,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private class InitializeInputCallable implements Callable<Void> {
 
     private final InputSpec inputSpec;
+    private final int inputIndex;
 
-    public InitializeInputCallable(InputSpec inputSpec) {
+    public InitializeInputCallable(InputSpec inputSpec, int inputIndex) {
       this.inputSpec = inputSpec;
+      this.inputIndex = inputIndex;
     }
 
     @Override
@@ -248,7 +254,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.info("Initializing Input using InputSpec: " + inputSpec);
       String edgeName = inputSpec.getSourceVertexName();
       LogicalInput input = createInput(inputSpec);
-      TezInputContext inputContext = createInputContext(inputSpec);
+      TezInputContext inputContext = createInputContext(inputSpec, inputIndex);
       inputsMap.put(edgeName, input);
       inputContextMap.put(edgeName, inputContext);
 
@@ -269,9 +275,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private class InitializeOutputCallable implements Callable<Void> {
 
     private final OutputSpec outputSpec;
+    private final int outputIndex;
 
-    public InitializeOutputCallable(OutputSpec outputSpec) {
+    public InitializeOutputCallable(OutputSpec outputSpec, int outputIndex) {
       this.outputSpec = outputSpec;
+      this.outputIndex = outputIndex;
     }
 
     @Override
@@ -279,7 +287,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.info("Initializing Output using OutputSpec: " + outputSpec);
       String edgeName = outputSpec.getDestinationVertexName();
       LogicalOutput output = createOutput(outputSpec);
-      TezOutputContext outputContext = createOutputContext(outputSpec);
+      TezOutputContext outputContext = createOutputContext(outputSpec, outputIndex);
       outputsMap.put(edgeName, output);
       outputContextMap.put(edgeName, outputContext);
 
@@ -307,11 +315,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         + processorDescriptor.getClassName());
   }
 
-  private TezInputContext createInputContext(InputSpec inputSpec) {
+  private TezInputContext createInputContext(InputSpec inputSpec, int inputIndex) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
         inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters,
+        tezCounters, inputIndex,
         inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : inputSpec
             .getInputDescriptor().getUserPayload(), this,
@@ -319,11 +327,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return inputContext;
   }
 
-  private TezOutputContext createOutputContext(OutputSpec outputSpec) {
+  private TezOutputContext createOutputContext(OutputSpec outputSpec, int outputIndex) {
     TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
         outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters,
+        tezCounters, outputIndex,
         outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : outputSpec
             .getOutputDescriptor().getUserPayload(), this,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 69d1ade..9849665 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -38,18 +38,20 @@ public class TezInputContextImpl extends TezTaskContextImpl
   private final byte[] userPayload;
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
+  private final int inputIndex;
 
   @Private
   public TezInputContextImpl(Configuration conf, int appAttemptNumber,
       TezUmbilical tezUmbilical, String taskVertexName,
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, byte[] userPayload,
+      TezCounters counters, int inputIndex, byte[] userPayload,
       RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv);
     this.userPayload = userPayload;
+    this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(
         EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
@@ -70,6 +72,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public byte[] getUserPayload() {
     return userPayload;
   }
+  
+  @Override
+  public int getInputIndex() {
+    return inputIndex;
+  }
 
   @Override
   public String getSourceVertexName() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 86014df..5233db8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -38,12 +38,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   private final byte[] userPayload;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
+  private final int outputIndex;
 
   @Private
   public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
       TezUmbilical tezUmbilical, String taskVertexName,
       String destinationVertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex,
       byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv) {
@@ -51,6 +52,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv);
     this.userPayload = userPayload;
+    this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
         taskVertexName, destinationVertexName, taskAttemptID);
@@ -81,4 +83,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     super.signalFatalError(exception, message, sourceInfo);
   }
 
+  @Override
+  public int getOutputIndex() {
+    return outputIndex;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 03eb895..158c080 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -106,6 +106,10 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     return taskVertexName;
   }
 
+  @Override
+  public int getTaskVertexIndex() {
+    return taskAttemptID.getTaskID().getVertexID().getId();
+  }
 
   @Override
   public TezCounters getCounters() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/dec9181a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index eb9b548..b68be2d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -117,7 +117,7 @@ public class TestOnFileUnorderedKVOutput {
     
     TezOutputContext outputContext = new TezOutputContextImpl(conf,
         appAttemptNumber, tezUmbilical, taskVertexName, destinationVertexName,
-        taskAttemptID, counters, userPayload, runtimeTask,
+        taskAttemptID, counters, 0, userPayload, runtimeTask,
         null, auxEnv);
 
     List<Event> events = null;


Mime
View raw message