tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [3/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)
Date Wed, 11 Sep 2013 04:49:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 16ded35..a5401fa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,26 +41,21 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.Processor;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -66,15 +63,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 public class MergeManager {
   
   private static final Log LOG = LogFactory.getLog(MergeManager.class);
-  
-  private final TezTaskAttemptID taskAttemptId;
-  
+
   private final Configuration conf;
   private final FileSystem localFS;
   private final FileSystem rfs;
   private final LocalDirAllocator localDirAllocator;
   
   private final  TezTaskOutputFiles mapOutputFile;
+  private final Progressable nullProgressable = new NullProgressable();
+  private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor  
   
   Set<MapOutput> inMemoryMergedMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
@@ -97,13 +94,15 @@ public class MergeManager {
   
   private final int ioSortFactor;
 
-  private final TezTaskReporter reporter;
   private final ExceptionReporter exceptionReporter;
   
+  private final TezInputContext inputContext;
+  
   /**
    * Combiner processor to run during in-memory merge, if defined.
    */
-  private final Processor combineProcessor;
+  // TODO NEWTEZ Fix Combiner
+  //private final Processor combineProcessor;
 
   private final TezCounter spilledRecordsCounter;
 
@@ -113,31 +112,28 @@ public class MergeManager {
   
   private final CompressionCodec codec;
   
-  private final Progress mergePhase;
+  private volatile boolean finalMergeComplete = false;
 
-  public MergeManager(TezTaskAttemptID taskAttemptId, 
-                      Configuration conf, 
+  public MergeManager(Configuration conf, 
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
-                      TezTaskReporter reporter,
+                      TezInputContext inputContext,
                       Processor combineProcessor,
                       TezCounter spilledRecordsCounter,
                       TezCounter reduceCombineInputCounter,
                       TezCounter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter,
-                      Progress mergePhase) {
-    this.taskAttemptId = taskAttemptId;
+                      ExceptionReporter exceptionReporter) {
+    // TODO NEWTEZ Change to include Combiner
+    this.inputContext = inputContext;
     this.conf = conf;
     this.localDirAllocator = localDirAllocator;
     this.exceptionReporter = exceptionReporter;
     
-    this.reporter = reporter;
-    this.combineProcessor = combineProcessor;
+    //this.combineProcessor = combineProcessor;
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new TezTaskOutputFiles();
-    this.mapOutputFile.setConf(conf);
+    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
     
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
@@ -224,13 +220,6 @@ public class MergeManager {
     
     this.onDiskMerger = new OnDiskMerger(this);
     this.onDiskMerger.start();
-    
-    this.mergePhase = mergePhase;
-  }
-  
-
-  TezTaskAttemptID getReduceId() {
-    return taskAttemptId;
   }
 
   public void waitForInMemoryMerge() throws InterruptedException {
@@ -240,18 +229,18 @@ public class MergeManager {
   private boolean canShuffleToMemory(long requestedSize) {
     return (requestedSize < maxSingleShuffleLimit); 
   }
-  
+
   final private MapOutput stallShuffle = new MapOutput(null);
 
-  public synchronized MapOutput reserve(TezTaskAttemptID mapId, 
+  public synchronized MapOutput reserve(TaskAttemptIdentifier srcAttemptIdentifier, 
                                              long requestedSize,
                                              int fetcher
                                              ) throws IOException {
     if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
+      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");
-      return new MapOutput(mapId, this, requestedSize, conf, 
+      return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf, 
                                 localDirAllocator, fetcher, true,
                                 mapOutputFile);
     }
@@ -272,17 +261,17 @@ public class MergeManager {
     // all the stalled threads
     
     if (usedMemory > memoryLimit) {
-      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
           + ") is greater than memoryLimit (" + memoryLimit + ")." + 
           " CommitMemory is (" + commitMemory + ")"); 
       return stallShuffle;
     }
     
     // Allow the in-memory shuffle to progress
-    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
         + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
         + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(mapId, requestedSize, true);
+    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
   }
   
   /**
@@ -290,9 +279,9 @@ public class MergeManager {
    * @return
    */
   private synchronized MapOutput unconditionalReserve(
-      TezTaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+      TaskAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
     usedMemory += requestedSize;
-    return new MapOutput(mapId, this, (int)requestedSize, 
+    return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, 
         primaryMapOutput);
   }
   
@@ -349,6 +338,18 @@ public class MergeManager {
       }
     }
   }
+
+  /**
+   * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
+   * return an invalid state since a merge may not be in progress dur to
+   * inadequate inputs
+   * 
+   * @return true if the merge process is complete, otherwise false
+   */
+  @Private
+  public boolean isMergeComplete() {
+    return finalMergeComplete;
+  }
   
   public TezRawKeyValueIterator close() throws Throwable {
     // Wait for on-going merges to complete
@@ -362,28 +363,32 @@ public class MergeManager {
       new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
     memory.addAll(inMemoryMapOutputs);
     List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
-    return finalMerge(conf, rfs, memory, disk);
+    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+    this.finalMergeComplete = true;
+    return kvIter;
   }
    
   void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
   throws IOException, InterruptedException {
 
-    CombineInput combineIn = new CombineInput(kvIter);
-    combineIn.initialize(conf, reporter);
+    // TODO NEWTEZ Fix CombineProcessor
     
-    CombineOutput combineOut = new CombineOutput(writer);
-    combineOut.initialize(conf, reporter);
-
-    try {
-      combineProcessor.process(new Input[] {combineIn},
-          new Output[] {combineOut});
-    } catch (IOException ioe) {
-      try {
-        combineProcessor.close();
-      } catch (IOException ignoredException) {}
-
-      throw ioe;
-    }
+//    CombineInput combineIn = new CombineInput(kvIter);
+//    combineIn.initialize(conf, reporter);
+//    
+//    CombineOutput combineOut = new CombineOutput(writer);
+//    combineOut.initialize(conf, reporter);
+//
+//    try {
+//      combineProcessor.process(new Input[] {combineIn},
+//          new Output[] {combineOut});
+//    } catch (IOException ioe) {
+//      try {
+//        combineProcessor.close();
+//      } catch (IOException ignoredException) {}
+//
+//      throw ioe;
+//    }
   
   }
 
@@ -404,7 +409,7 @@ public class MergeManager {
         return;
       }
 
-      TezTaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
+      TaskAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
         createInMemorySegments(inputs, inMemorySegments, 0);
@@ -424,13 +429,13 @@ public class MergeManager {
                        ConfigUtils.getIntermediateInputKeyClass(conf),
                        ConfigUtils.getIntermediateInputValueClass(conf),
                        inMemorySegments, inMemorySegments.size(),
-                       new Path(taskAttemptId.toString()),
+                       new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       reporter, null, null, null);
-      TezMerger.writeFile(rIter, writer, reporter, conf);
+                       nullProgressable, null, null, null);
+      TezMerger.writeFile(rIter, writer, nullProgressable, conf);
       writer.close();
 
-      LOG.info(taskAttemptId +  
+      LOG.info(inputContext.getUniqueIdentifier() +  
                " Memory-to-Memory merge of the " + noInMemorySegments +
                " files in-memory complete.");
 
@@ -463,8 +468,7 @@ public class MergeManager {
       //in the merge method)
 
       //figure out the mapId 
-      TezTaskAttemptID mapId = inputs.get(0).getMapId();
-      TezTaskID mapTaskId = mapId.getTaskID();
+      TaskAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
@@ -472,7 +476,7 @@ public class MergeManager {
       int noInMemorySegments = inMemorySegments.size();
 
       Path outputPath = 
-        mapOutputFile.getInputFileForWrite(mapTaskId,
+        mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getTaskIndex(),
                                            mergeOutputSize).suffix(
                                                Constants.MERGED_OUTPUT_PREFIX);
 
@@ -492,19 +496,19 @@ public class MergeManager {
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inMemorySegments, inMemorySegments.size(),
-            new Path(taskAttemptId.toString()),
+            new Path(inputContext.getUniqueIdentifier()),
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            reporter, spilledRecordsCounter, null, null);
+            nullProgressable, spilledRecordsCounter, null, null);
 
         if (null == combineProcessor) {
-          TezMerger.writeFile(rIter, writer, reporter, conf);
+          TezMerger.writeFile(rIter, writer, nullProgressable, conf);
         } else {
           runCombineProcessor(rIter, writer);
         }
         writer.close();
         writer = null;
 
-        LOG.info(taskAttemptId +  
+        LOG.info(inputContext.getUniqueIdentifier() +  
             " Merge of the " + noInMemorySegments +
             " files in-memory complete." +
             " Local file is " + outputPath + " of size " + 
@@ -568,7 +572,7 @@ public class MergeManager {
                         (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null);
       TezRawKeyValueIterator iter  = null;
-      Path tmpDir = new Path(taskAttemptId.toString());
+      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
         iter = TezMerger.merge(conf, rfs,
                             (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
@@ -576,10 +580,10 @@ public class MergeManager {
                             codec, inputs.toArray(new Path[inputs.size()]), 
                             true, ioSortFactor, tmpDir, 
                             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
-                            reporter, spilledRecordsCounter, null, 
+                            nullProgressable, spilledRecordsCounter, null, 
                             mergedMapOutputsCounter, null);
 
-        TezMerger.writeFile(iter, writer, reporter, conf);
+        TezMerger.writeFile(iter, writer, nullProgressable, conf);
         writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
@@ -588,7 +592,7 @@ public class MergeManager {
 
       closeOnDiskFile(outputPath);
 
-      LOG.info(taskAttemptId +
+      LOG.info(inputContext.getUniqueIdentifier() +
           " Finished merging " + inputs.size() + 
           " map output files on disk of total-size " + 
           approxOutputSize + "." + 
@@ -615,7 +619,7 @@ public class MergeManager {
       totalSize += size;
       fullSize -= size;
       IFile.Reader reader = new InMemoryReader(MergeManager.this, 
-                                                   mo.getMapId(),
+                                                   mo.getAttemptIdentifier(),
                                                    data, 0, (int)size);
       inMemorySegments.add(new Segment(reader, true, 
                                             (mo.isPrimaryMapOutput() ? 
@@ -683,7 +687,7 @@ public class MergeManager {
     // merge config params
     Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
     Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
-    final Path tmpDir = new Path(taskAttemptId.toString());
+    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
     final RawComparator comparator =
       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
 
@@ -692,7 +696,7 @@ public class MergeManager {
     long inMemToDiskBytes = 0;
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
-      TezTaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getTaskIndex();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                 memDiskSegments,
                                                 maxInMemReduce);
@@ -710,17 +714,16 @@ public class MergeManager {
         mergePhaseFinished = true;
         // must spill to disk, but can't retain in-mem for intermediate merge
         final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(mapId,
+          mapOutputFile.getInputFileForWrite(srcTaskId,
                                              inMemToDiskBytes).suffix(
                                                  Constants.MERGED_OUTPUT_PREFIX);
         final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
-            mergePhase);
+            tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
         final Writer writer = new Writer(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
-          TezMerger.writeFile(rIter, writer, reporter, job);
+          TezMerger.writeFile(rIter, writer, nullProgressable, job);
           // add to list of final disk outputs.
           onDiskMapOutputs.add(outputPath);
         } catch (IOException e) {
@@ -784,13 +787,10 @@ public class MergeManager {
       final int numInMemSegments = memDiskSegments.size();
       diskSegments.addAll(0, memDiskSegments);
       memDiskSegments.clear();
-      // Pass mergePhase only if there is a going to be intermediate
-      // merges. See comment where mergePhaseFinished is being set
-      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       TezRawKeyValueIterator diskMerge = TezMerger.merge(
           job, fs, keyClass, valueClass, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
-          reporter, false, spilledRecordsCounter, null, thisPhase);
+          nullProgressable, false, spilledRecordsCounter, null, null);
       diskSegments.clear();
       if (0 == finalSegments.size()) {
         return diskMerge;
@@ -800,7 +800,7 @@ public class MergeManager {
     }
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
-                 comparator, reporter, spilledRecordsCounter, null,
+                 comparator, nullProgressable, spilledRecordsCounter, null,
                  null);
   
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 69dd036..9dd213e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -18,6 +18,10 @@
 package org.apache.tez.engine.common.shuffle.impl;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -27,18 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.TezTaskStatus;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+
+import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -46,160 +48,176 @@ public class Shuffle implements ExceptionReporter {
   
   private static final Log LOG = LogFactory.getLog(Shuffle.class);
   private static final int PROGRESS_FREQUENCY = 2000;
-  private static final int MAX_EVENTS_TO_FETCH = 10000;
-  private static final int MIN_EVENTS_TO_FETCH = 100;
-  private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
-
-  private final TezEngineTaskContext taskContext;
-  private final RunningTaskContext runningTaskContext;
+  
   private final Configuration conf;
-  private final TezTaskReporter reporter;
+  private final TezInputContext inputContext;
   private final ShuffleClientMetrics metrics;
-  
+
+  private final ShuffleInputEventHandler eventHandler;
   private final ShuffleScheduler scheduler;
   private final MergeManager merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
-  private final Progress copyPhase;
-  private final Progress mergePhase;
-  private final int tasksInDegree;
+  private final int numInputs;
   private final AtomicInteger reduceStartId;
   private AtomicInteger reduceRange = new AtomicInteger(
       TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
   
-  public Shuffle(TezEngineTaskContext taskContext,
-                 RunningTaskContext runningTaskContext,
-                 Configuration conf,
-                 int tasksInDegree,
-                 TezTaskReporter reporter,
-                 Processor combineProcessor
-                 ) throws IOException {
-    this.taskContext = taskContext;
-    this.runningTaskContext = runningTaskContext;
+  private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
+
+  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    // TODO NEWTEZ Handle Combiner
+    this.inputContext = inputContext;
     this.conf = conf;
-    this.reporter = reporter;
-    this.metrics = 
-        new ShuffleClientMetrics(
-            taskContext.getTaskAttemptId(), this.conf, 
-            this.taskContext.getUser(), this.taskContext.getJobName());
-    this.tasksInDegree = tasksInDegree;
+    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+            
+    this.numInputs = numInputs;
     
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-    
-    copyPhase = this.runningTaskContext.getProgress().addPhase("copy");
-    mergePhase = this.runningTaskContext.getProgress().addPhase("merge");
 
     // TODO TEZ Get rid of Map / Reduce references.
     TezCounter shuffledMapsCounter = 
-        reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
+        inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
     TezCounter reduceShuffleBytes =
-        reporter.getCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+        inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
     TezCounter failedShuffleCounter =
-        reporter.getCounter(TaskCounter.FAILED_SHUFFLE);
+        inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
     TezCounter spilledRecordsCounter = 
-        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     TezCounter reduceCombineInputCounter =
-        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     TezCounter mergedMapOutputsCounter =
-        reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     
-    reduceStartId = new AtomicInteger( 
-        taskContext.getTaskAttemptId().getTaskID().getId()); 
+    reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
     LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
         + " with default reduce range: " + reduceRange.get());
 
-    scheduler = 
-      new ShuffleScheduler(this.conf, tasksInDegree,
-                                runningTaskContext.getStatus(), 
-                                this, copyPhase, 
-                                shuffledMapsCounter, 
-                                reduceShuffleBytes, 
-                                failedShuffleCounter);
-    merger = new MergeManager(this.taskContext.getTaskAttemptId(), 
-                                    this.conf, localFS, 
-                                    localDirAllocator, reporter, 
-                                    combineProcessor, 
-                                    spilledRecordsCounter, 
-                                    reduceCombineInputCounter, 
-                                    mergedMapOutputsCounter, 
-                                    this, mergePhase);
+    scheduler = new ShuffleScheduler(
+          this.inputContext,
+          this.conf,
+          this.numInputs,
+          this,
+          shuffledMapsCounter,
+          reduceShuffleBytes,
+          failedShuffleCounter);
+    eventHandler= new ShuffleInputEventHandler(
+          inputContext,
+          this,
+          scheduler);
+    merger = new MergeManager(
+          this.conf,
+          localFS,
+          localDirAllocator,
+          inputContext,
+          null, // TODO NEWTEZ Fix Combiner
+          spilledRecordsCounter,
+          reduceCombineInputCounter,
+          mergedMapOutputsCounter,
+          this);
   }
 
-  public TezRawKeyValueIterator run() throws IOException, InterruptedException {
-    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
-    // on the ApplicationMaster when a thundering herd of reducers fetch events
-    // TODO: This should not be necessary after HADOOP-8942
-    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
-        MAX_RPC_OUTSTANDING_EVENTS / tasksInDegree);
-    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+  public void handleEvents(List<Event> events) {
+    eventHandler.handleEvents(events);
+  }
+  
+  /**
+   * Indicates whether the Shuffle and Merge processing is complete.
+   * @return false if not complete, true if complete or if an error occurred.
+   */
+  public boolean isInputReady() {
+    if (runShuffleFuture == null) {
+      return false;
+    }
+    return runShuffleFuture.isDone();
+    //return scheduler.isDone() && merger.isMergeComplete();
+  }
 
-    // Start the map-completion events fetcher thread
-    final EventFetcher eventFetcher = 
-      new EventFetcher(taskContext.getTaskAttemptId(), reporter, scheduler, this,
-          maxEventsToFetch);
-    eventFetcher.start();
-    
-    // Start the map-output fetcher threads
-    final int numFetchers = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-    Fetcher[] fetchers = new Fetcher[numFetchers];
-    for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher(conf, scheduler,
-          merger, reporter, metrics, this,
-          runningTaskContext.getJobTokenSecret());
-      fetchers[i].start();
+  /**
+   * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
+   * @return an iterator over the fetched input.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
+    Preconditions.checkState(runShuffleFuture != null,
+        "waitForInput can only be called after run");
+    TezRawKeyValueIterator kvIter;
+    try {
+      kvIter = runShuffleFuture.get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
+      } else {
+        throw new TezUncheckedException(
+            "Unexpected exception type while running Shuffle and Merge", cause);
+      }
     }
-    
-    // Wait for shuffle to complete successfully
-    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-      reporter.progress();
+    return kvIter;
+  }
+
+  public void run() {
+    RunShuffleCallable runShuffle = new RunShuffleCallable();
+    runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
+    new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
+  }
+  
+  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
+    @Override
+    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+      final int numFetchers = 
+          conf.getInt(
+              TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
+              TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+      Fetcher[] fetchers = new Fetcher[numFetchers];
+      for (int i = 0; i < numFetchers; ++i) {
+        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, inputContext);
+        fetchers[i].start();
+      }
       
-      synchronized (this) {
-        if (throwable != null) {
-          throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                 throwable);
+      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+        synchronized (this) {
+          if (throwable != null) {
+            throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                   throwable);
+          }
         }
       }
-    }
-
-    // Stop the event-fetcher thread
-    eventFetcher.shutDown();
-    
-    // Stop the map-output fetcher threads
-    for (Fetcher fetcher : fetchers) {
-      fetcher.shutDown();
-    }
-    fetchers = null;
-    
-    // stop the scheduler
-    scheduler.close();
+      
+      // Stop the map-output fetcher threads
+      for (Fetcher fetcher : fetchers) {
+        fetcher.shutDown();
+      }
+      fetchers = null;
+      
+      // stop the scheduler
+      scheduler.close();
 
-    copyPhase.complete(); // copy is already complete
-    runningTaskContext.getStatus().setPhase(TezTaskStatus.Phase.SORT);
-    
-    runningTaskContext.statusUpdate();
-    
-    // Finish the on-going merges...
-    TezRawKeyValueIterator kvIter = null;
-    try {
-      kvIter = merger.close();
-    } catch (Throwable e) {
-      throw new ShuffleError("Error while doing final merge " , e);
-    }
 
-    // Sanity check
-    synchronized (this) {
-      if (throwable != null) {
-        throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                               throwable);
+      // Finish the on-going merges...
+      TezRawKeyValueIterator kvIter = null;
+      try {
+        kvIter = merger.close();
+      } catch (Throwable e) {
+        throw new ShuffleError("Error while doing final merge " , e);
+      }
+      
+      // Sanity check
+      synchronized (Shuffle.this) {
+        if (throwable != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                 throwable);
+        }
       }
+      return kvIter;
     }
-    
-    return kvIter;
   }
   
   public int getReduceStartId() {
@@ -229,19 +247,8 @@ public class Shuffle implements ExceptionReporter {
       super(msg, t);
     }
   }
-  
-  public void updateUserPayload(byte[] userPayload) throws IOException {
-    if(userPayload == null) {
-      return;
-    }
-    Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
-    int reduceRange = conf.getInt(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE,
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-    setReduceRange(reduceRange);
-  }
-  
-  private void setReduceRange(int range) {
+
+  public void setPartitionRange(int range) {
     if (range == reduceRange.get()) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
index 34b26c4..850dbeb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.common.TezEngineUtils;
 
 class ShuffleClientMetrics implements Updater {
 
@@ -35,10 +35,10 @@ class ShuffleClientMetrics implements Updater {
   private int numThreadsBusy = 0;
   private final int numCopiers;
   
-  ShuffleClientMetrics(TezTaskAttemptID reduceId, Configuration jobConf, 
-      String user, String jobName) {
+  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
+      String user) {
     this.numCopiers = 
-        jobConf.getInt(
+        conf.getInt(
             TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
 
@@ -46,12 +46,10 @@ class ShuffleClientMetrics implements Updater {
     this.shuffleMetrics = 
       MetricsUtil.createRecord(metricsContext, "shuffleInput");
     this.shuffleMetrics.setTag("user", user);
-    this.shuffleMetrics.setTag("jobName", jobName);
-    this.shuffleMetrics.setTag("jobId", 
-        reduceId.getTaskID().getVertexID().getDAGId().toString());
-    this.shuffleMetrics.setTag("taskId", reduceId.toString());
+    this.shuffleMetrics.setTag("dagName", dagName);
+    this.shuffleMetrics.setTag("taskId", TezEngineUtils.getTaskIdentifier(vertexName, taskIndex));
     this.shuffleMetrics.setTag("sessionId", 
-        jobConf.get(
+        conf.get(
             TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID, 
             TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
     metricsContext.registerUpdater(this);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
new file mode 100644
index 0000000..012103f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ShuffleInputEventHandler {
+  
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+
+  private final ShuffleScheduler scheduler;
+  private final TezInputContext inputContext;
+  private final Shuffle shuffle;
+
+  private int maxMapRuntime = 0;
+  private boolean shuffleRangeSet = false;
+  
+  public ShuffleInputEventHandler(TezInputContext inputContext,
+      Shuffle shuffle, ShuffleScheduler scheduler) {
+    this.inputContext = inputContext;
+    this.shuffle = shuffle;
+    this.scheduler = scheduler;
+  }
+
+  public void handleEvents(List<Event> events) {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  
+  private void handleEvent(Event event) {
+    if (event instanceof InputInformationEvent) {
+      processInputInformationEvent((InputInformationEvent) event);
+    }
+    else if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent) event);      
+    } else if (event instanceof InputFailedEvent) {
+      processTaskFailedEvent((InputFailedEvent) event);
+    }
+  }
+
+  private void processInputInformationEvent(InputInformationEvent iiEvent) {
+    InputInformationEventPayloadProto inputInfoPayload;
+    try {
+      inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
+    }
+    int partitionRange = inputInfoPayload.getPartitionRange();
+    shuffle.setPartitionRange(partitionRange);
+    this.shuffleRangeSet = true;
+  }
+
+  private void processDataMovementEvent(DataMovementEvent dmEvent) {
+    Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    } 
+    int partitionId = dmEvent.getSourceIndex();
+    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+
+    TaskAttemptIdentifier srcAttemptIdentifier = new TaskAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
+    
+    // TODO NEWTEZ See if this duration hack can be removed.
+    int duration = shufflePayload.getRunDuration();
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+      scheduler.informMaxMapRunTime(maxMapRuntime);
+    }
+  }
+  
+  private void processTaskFailedEvent(InputFailedEvent ifEvent) {
+    TaskAttemptIdentifier taIdentifier = new TaskAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+    scheduler.obsoleteMapOutput(taIdentifier);
+    LOG.info("Obsoleting output of src-task: " + taIdentifier);
+  }
+
+  // TODO NEWTEZ Handle encrypted shuffle
+  private URI getBaseURI(String host, int port, int partitionId) {
+    StringBuilder sb = new StringBuilder("http://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(String.valueOf(port));
+    sb.append("/");
+    
+    sb.append("mapOutput?job=");
+    // Required to use the existing ShuffleHandler
+    sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
+    
+    sb.append("&reduce=");
+    sb.append(partitionId);
+    sb.append("&map=");
+    URI u = URI.create(sb.toString());
+    return u;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
index 6bd18ef..964533d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -36,12 +38,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+
+import com.google.common.collect.Lists;
 
 class ShuffleScheduler {
   static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
@@ -55,24 +59,26 @@ class ShuffleScheduler {
   private static final long INITIAL_PENALTY = 10000;
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   
-  private final Map<TezTaskID, MutableInt> finishedMaps;
-  private final int tasksInDegree;
+  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
+  private final Map<Integer, MutableInt> finishedMaps;
+  private final int numInputs;
   private int remainingMaps;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  private Map<TaskAttemptIdentifier, MapHost> mapLocations = new HashMap<TaskAttemptIdentifier, MapHost>();
+  //TODO NEWTEZ Clean this and other maps at some point
+  private ConcurrentMap<String, TaskAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, TaskAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<TezTaskAttemptID> obsoleteMaps = new HashSet<TezTaskAttemptID>();
+  private Set<TaskAttemptIdentifier> obsoleteMaps = new HashSet<TaskAttemptIdentifier>();
   
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
   private final Referee referee = new Referee();
-  private final Map<TezTaskAttemptID,IntWritable> failureCounts =
-    new HashMap<TezTaskAttemptID,IntWritable>();
+  private final Map<TaskAttemptIdentifier, IntWritable> failureCounts =
+    new HashMap<TaskAttemptIdentifier,IntWritable>(); 
   private final Map<String,IntWritable> hostFailures = 
     new HashMap<String,IntWritable>();
-  private final TezTaskStatus status;
+  private final TezInputContext inputContext;
   private final Shuffle shuffle;
   private final int abortFailureLimit;
-  private final Progress progress;
   private final TezCounter shuffledMapsCounter;
   private final TezCounter reduceShuffleBytes;
   private final TezCounter failedShuffleCounter;
@@ -89,26 +95,25 @@ class ShuffleScheduler {
 
   private boolean reportReadErrorImmediately = true;
   
-  public ShuffleScheduler(Configuration conf,
+  public ShuffleScheduler(TezInputContext inputContext,
+                          Configuration conf,
                           int tasksInDegree,
-                          TezTaskStatus status,
                           Shuffle shuffle,
-                          Progress progress,
                           TezCounter shuffledMapsCounter,
                           TezCounter reduceShuffleBytes,
                           TezCounter failedShuffleCounter) {
-    this.tasksInDegree = tasksInDegree;
+    this.inputContext = inputContext;
+    this.numInputs = tasksInDegree;
     abortFailureLimit = Math.max(30, tasksInDegree / 10);
     remainingMaps = tasksInDegree;
-    finishedMaps = new HashMap<TezTaskID, MutableInt>(remainingMaps);
+  //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
+    finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
     this.shuffle = shuffle;
-    this.status = status;
-    this.progress = progress;
     this.shuffledMapsCounter = shuffledMapsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
     this.failedShuffleCounter = failedShuffleCounter;
     this.startTime = System.currentTimeMillis();
-    lastProgressTime = startTime;
+    this.lastProgressTime = startTime;
     referee.start();
     this.maxFailedUniqueFetches = Math.min(tasksInDegree,
         this.maxFailedUniqueFetches);
@@ -122,19 +127,19 @@ class ShuffleScheduler {
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
   }
 
-  public synchronized void copySucceeded(TezTaskAttemptID mapId, 
+  public synchronized void copySucceeded(TaskAttemptIdentifier srcAttemptIdentifier, 
                                          MapHost host,
                                          long bytes,
-                                         long millis,
+                                         long milis,
                                          MapOutput output
                                          ) throws IOException {
-    failureCounts.remove(mapId);
+    String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+    failureCounts.remove(taskIdentifier);
     hostFailures.remove(host.getHostName());
-    TezTaskID taskId = mapId.getTaskID();
     
-    if (!isFinishedTaskTrue(taskId)) {
+    if (!isFinishedTaskTrue(srcAttemptIdentifier.getTaskIndex())) {
       output.commit();
-      if(incrementTaskCopyAndCheckCompletion(taskId)) {
+      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getTaskIndex())) {
         shuffledMapsCounter.increment(1);
         if (--remainingMaps == 0) {
           notifyAll();
@@ -142,38 +147,40 @@ class ShuffleScheduler {
       }
 
       // update the status
+      lastProgressTime = System.currentTimeMillis();
       totalBytesShuffledTillNow += bytes;
-      updateStatus();
+      logProgress();
       reduceShuffleBytes.increment(bytes);
-      lastProgressTime = System.currentTimeMillis();
-      LOG.debug("map " + mapId + " done " + status.getStateString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("src task: "
+            + TezEngineUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getTaskIndex(),
+                srcAttemptIdentifier.getAttemptNumber()) + " done");
+      }
     }
   }
-  
-  private void updateStatus() {
+
+  private void logProgress() {
     float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-    int mapsDone = tasksInDegree - remainingMaps;
+    int mapsDone = numInputs - remainingMaps;
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
     float transferRate = mbs / secsSinceStart;
-    progress.set((float) mapsDone / tasksInDegree);
-    String statusString = mapsDone + " / " + tasksInDegree + " copied.";
-    status.setStateString(statusString);
-
-    progress.setStatus("copy(" + mapsDone + " of " + tasksInDegree + " at "
+    LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
-  public synchronized void copyFailed(TezTaskAttemptID mapId, MapHost host,
+  public synchronized void copyFailed(TaskAttemptIdentifier srcAttempt,
+                                      MapHost host,
                                       boolean readError) {
     host.penalize();
     int failures = 1;
-    if (failureCounts.containsKey(mapId)) {
-      IntWritable x = failureCounts.get(mapId);
+    if (failureCounts.containsKey(srcAttempt)) {
+      IntWritable x = failureCounts.get(srcAttempt);
       x.set(x.get() + 1);
       failures = x.get();
     } else {
-      failureCounts.put(mapId, new IntWritable(1));      
+      failureCounts.put(srcAttempt, new IntWritable(1));      
     }
     String hostname = host.getHostName();
     if (hostFailures.containsKey(hostname)) {
@@ -184,13 +191,17 @@ class ShuffleScheduler {
     }
     if (failures >= abortFailureLimit) {
       try {
-        throw new IOException(failures + " failures downloading " + mapId);
+        throw new IOException(failures
+            + " failures downloading "
+            + TezEngineUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+                srcAttempt.getAttemptNumber()));
       } catch (IOException ie) {
         shuffle.reportException(ie);
       }
     }
     
-    checkAndInformJobTracker(failures, mapId, readError);
+    checkAndInformJobTracker(failures, srcAttempt, readError);
 
     checkReducerHealth();
     
@@ -206,11 +217,23 @@ class ShuffleScheduler {
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
-      int failures, TezTaskAttemptID mapId, boolean readError) {
+      int failures, TaskAttemptIdentifier srcAttempt, boolean readError) {
     if ((reportReadErrorImmediately && readError)
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
-      status.addFailedDependency(mapId);
+      LOG.info("Reporting fetch failure for "
+          + TezEngineUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+              srcAttempt.getAttemptNumber()) + " to jobtracker.");
+
+      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+      failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+          + TezEngineUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt
+          .getTaskIndex(), srcAttempt.getAttemptNumber()));
+
+      inputContext.sendEvents(failedEvents);      
+      //status.addFailedDependency(mapId);
     }
   }
     
@@ -220,7 +243,7 @@ class ShuffleScheduler {
     final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
 
     long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = tasksInDegree - remainingMaps;
+    int doneMaps = numInputs - remainingMaps;
     
     boolean reducerHealthy =
       (((float)totalFailures / (totalFailures + doneMaps))
@@ -228,7 +251,7 @@ class ShuffleScheduler {
     
     // check if the reducer has progressed enough
     boolean reducerProgressedEnough =
-      (((float)doneMaps / tasksInDegree)
+      (((float)doneMaps / numInputs)
           >= MIN_REQUIRED_PROGRESS_PERCENT);
 
     // check if the reducer is stalled for a long time
@@ -252,7 +275,7 @@ class ShuffleScheduler {
 
     // kill if not healthy and has insufficient progress
     if ((failureCounts.size() >= maxFailedUniqueFetches ||
-        failureCounts.size() == (tasksInDegree - doneMaps))
+        failureCounts.size() == (numInputs - doneMaps))
         && !reducerHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
       LOG.fatal("Shuffle failed with too many fetch failures " +
@@ -263,28 +286,29 @@ class ShuffleScheduler {
 
   }
   
-  public synchronized void tipFailed(TezTaskID taskId) {
-    if (!isFinishedTaskTrue(taskId)) {
-      setFinishedTaskTrue(taskId);
+  public synchronized void tipFailed(int srcTaskIndex) {
+    if (!isFinishedTaskTrue(srcTaskIndex)) {
+      setFinishedTaskTrue(srcTaskIndex);
       if (--remainingMaps == 0) {
         notifyAll();
       }
-      updateStatus();
+      logProgress();
     }
   }
   
   public synchronized void addKnownMapOutput(String hostName,
                                              int partitionId,
                                              String hostUrl,
-                                             TezTaskAttemptID mapId) {
+                                             TaskAttemptIdentifier srcAttempt) {
     String identifier = MapHost.createIdentifier(hostName, partitionId);
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
       host = new MapHost(partitionId, hostName, hostUrl);
       assert identifier.equals(host.getIdentifier());
-      mapLocations.put(identifier, host);
+      mapLocations.put(srcAttempt, host);
     }
-    host.addKnownMap(mapId);
+    host.addKnownMap(srcAttempt);
+    pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
 
     // Mark the host as pending
     if (host.getState() == MapHost.State.PENDING) {
@@ -293,13 +317,14 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void obsoleteMapOutput(TezTaskAttemptID mapId) {
-    obsoleteMaps.add(mapId);
+  public synchronized void obsoleteMapOutput(TaskAttemptIdentifier srcAttempt) {
+    // The incoming srcAttempt does not contain a path component.
+    obsoleteMaps.add(srcAttempt);
   }
   
-  public synchronized void putBackKnownMapOutput(MapHost host, 
-                                                 TezTaskAttemptID mapId) {
-    host.addKnownMap(mapId);
+  public synchronized void putBackKnownMapOutput(MapHost host,
+                                                 TaskAttemptIdentifier srcAttempt) {
+    host.addKnownMap(srcAttempt);
   }
 
   public synchronized MapHost getHost() throws InterruptedException {
@@ -324,16 +349,20 @@ class ShuffleScheduler {
       return host;
   }
   
-  public synchronized List<TezTaskAttemptID> getMapsForHost(MapHost host) {
-    List<TezTaskAttemptID> list = host.getAndClearKnownMaps();
-    Iterator<TezTaskAttemptID> itr = list.iterator();
-    List<TezTaskAttemptID> result = new ArrayList<TezTaskAttemptID>();
+  public TaskAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
+    return pathToIdentifierMap.get(pathComponent);
+  }
+  
+  public synchronized List<TaskAttemptIdentifier> getMapsForHost(MapHost host) {
+    List<TaskAttemptIdentifier> list = host.getAndClearKnownMaps();
+    Iterator<TaskAttemptIdentifier> itr = list.iterator();
+    List<TaskAttemptIdentifier> result = new ArrayList<TaskAttemptIdentifier>();
     int includedMaps = 0;
     int totalSize = list.size();
     // find the maps that we still need, up to the limit
     while (itr.hasNext()) {
-      TezTaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+      TaskAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
@@ -342,8 +371,8 @@ class ShuffleScheduler {
     }
     // put back the maps left after the limit
     while (itr.hasNext()) {
-      TezTaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+      TaskAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
         host.addKnownMap(id);
       }
     }
@@ -367,8 +396,17 @@ class ShuffleScheduler {
     mapLocations.clear();
     obsoleteMaps.clear();
     pendingHosts.clear();
+    pathToIdentifierMap.clear();
   }
-  
+
+  /**
+   * Utility method to check if the Shuffle data fetch is complete.
+   * @return
+   */
+  public synchronized boolean isDone() {
+    return remainingMaps == 0;
+  }
+
   /**
    * Wait until the shuffle finishes or until the timeout.
    * @param millis maximum wait time
@@ -448,27 +486,27 @@ class ShuffleScheduler {
     }
   }
   
-  void setFinishedTaskTrue(TezTaskID taskId) {
+  void setFinishedTaskTrue(int srcTaskIndex) {
     synchronized(finishedMaps) {
-      finishedMaps.put(taskId, new MutableInt(shuffle.getReduceRange()));
+      finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
     }
   }
   
-  boolean incrementTaskCopyAndCheckCompletion(TezTaskID mapTaskId) {
+  boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
     synchronized(finishedMaps) {
-      MutableInt result = finishedMaps.get(mapTaskId);
+      MutableInt result = finishedMaps.get(srcTaskIndex);
       if(result == null) {
         result = new MutableInt(0);
-        finishedMaps.put(mapTaskId, result);
+        finishedMaps.put(srcTaskIndex, result);
       }
       result.increment();
-      return isFinishedTaskTrue(mapTaskId);
+      return isFinishedTaskTrue(srcTaskIndex);
     }
   }
   
-  boolean isFinishedTaskTrue(TezTaskID taskId) {
+  boolean isFinishedTaskTrue(int srcTaskIndex) {
     synchronized (finishedMaps) {
-      MutableInt result = finishedMaps.get(taskId);
+      MutableInt result = finishedMaps.get(srcTaskIndex);
       if(result == null) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..f77166d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class TaskAttemptIdentifier {
+
+  private final int taskIndex;
+  private final int attemptNumber;
+  private String pathComponent;
+  
+  public TaskAttemptIdentifier(int taskIndex, int attemptNumber) {
+    this.taskIndex = taskIndex;
+    this.attemptNumber = attemptNumber;
+  }
+  
+  public TaskAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+    this.taskIndex = taskIndex;
+    this.attemptNumber = attemptNumber;
+    this.pathComponent = pathComponent;
+  }
+
+  public int getTaskIndex() {
+    return taskIndex;
+  }
+
+  public int getAttemptNumber() {
+    return attemptNumber;
+  }
+  
+  public String getPathComponent() {
+    return pathComponent;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + attemptNumber;
+    result = prime * result
+        + ((pathComponent == null) ? 0 : pathComponent.hashCode());
+    result = prime * result + taskIndex;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TaskAttemptIdentifier other = (TaskAttemptIdentifier) obj;
+    if (attemptNumber != other.attemptNumber)
+      return false;
+    if (pathComponent == null) {
+      if (other.pathComponent != null)
+        return false;
+    } else if (!pathComponent.equals(other.pathComponent))
+      return false;
+    if (taskIndex != other.taskIndex)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskAttemptIdentifier [taskIndex=" + taskIndex + ", attemptNumber="
+        + attemptNumber + ", pathComponent=" + pathComponent + "]";
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
index 0befaa8..f61670e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
@@ -63,13 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.RunningTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -245,9 +246,9 @@ public class ShuffleHandler extends AuxiliaryService {
     userRsrc.remove(appId.toString());
   }
 
-  public synchronized void init(Configuration conf, RunningTaskContext task) {
+  public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
     this.init(new Configuration(conf));
-    tokenSecret = task.getJobTokenSecret();
+    tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
   }
 
   @Override
@@ -441,14 +442,16 @@ public class ShuffleHandler extends AuxiliaryService {
       for (String mapId : mapIds) {
         try {
           // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
-          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
-            String errorMessage =
-                "Illegal shuffle request mapId: " + mapId
-                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
-            LOG.warn(errorMessage);
-            sendError(ctx, errorMessage, BAD_REQUEST);
-            return;
-          }
+          
+          // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
+//          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
+//            String errorMessage =
+//                "Illegal shuffle request mapId: " + mapId
+//                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
+//            LOG.warn(errorMessage);
+//            sendError(ctx, errorMessage, BAD_REQUEST);
+//            return;
+//          }
 
           lastMap =
             sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index b90682e..4df1c01 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -21,6 +21,7 @@ package org.apache.tez.engine.common.sort.impl;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Constructor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,47 +37,41 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public abstract class ExternalSorter {
 
   private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
 
-  public abstract void close() throws IOException, InterruptedException;
+  public abstract void close() throws IOException;
 
-  public abstract void flush() throws IOException, InterruptedException;
+  public abstract void flush() throws IOException;
 
-  public abstract void write(Object key, Object value) throws IOException,
-      InterruptedException;
+  public abstract void write(Object key, Object value) throws IOException;
 
+  protected Progressable nullProgressable = new NullProgressable();
+  protected TezOutputContext outputContext;
   protected Processor combineProcessor;
   protected Partitioner partitioner;
-  protected TezEngineTaskContext task;
-  protected RunningTaskContext runningTaskContext;
-  protected Configuration job;
+  protected Configuration conf;
   protected FileSystem rfs;
   protected TezTaskOutput mapOutputFile;
   protected int partitions;
@@ -92,69 +87,68 @@ public abstract class ExternalSorter {
   // Compression for map-outputs
   protected CompressionCodec codec;
 
+  // TODO NEWTEZ Setup CombineProcessor
+  // TODO NEWTEZ Setup Partitioner in SimpleOutput
+
   // Counters
+  // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
   protected TezCounter mapOutputByteCounter;
   protected TezCounter mapOutputRecordCounter;
   protected TezCounter fileOutputByteCounter;
   protected TezCounter spilledRecordsCounter;
-  protected Progress sortPhase;
 
-  public void initialize(Configuration conf, Master master)
-      throws IOException, InterruptedException {
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    this.outputContext = outputContext;
+    this.conf = conf;
+    this.partitions = numOutputs;
 
-    this.job = conf;
-    LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " +
-        job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
-
-    partitions = task.getOutputSpecList().get(0).getNumOutputs();
-//    partitions =
-//        job.getInt(
-//            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
-//            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
-    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
+    rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
     // sorter
-    sorter = ReflectionUtils.newInstance(job.getClass(
+    sorter = ReflectionUtils.newInstance(this.conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
-        IndexedSorter.class), job);
+        IndexedSorter.class), this.conf);
 
-    comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
+    comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
 
     // k/v serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(job);
-    serializationFactory = new SerializationFactory(job);
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+    serializationFactory = new SerializationFactory(this.conf);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
 
     //    counters
     mapOutputByteCounter =
-        runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
     mapOutputRecordCounter =
-      runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
     fileOutputByteCounter =
-        runningTaskContext.getTaskReporter().
-            getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
     spilledRecordsCounter =
-        runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+        outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
+    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(job, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, job);
+          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, this.conf);
     } else {
       codec = null;
     }
 
     // Task outputs
-    mapOutputFile =
-        (TezTaskOutput) ReflectionUtils.newInstance(
-            conf.getClass(
-                Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
-                TezTaskOutputFiles.class), conf);
-
-    // sortPhase
-    sortPhase  = runningTaskContext.getProgress().addPhase("sort");
+    mapOutputFile = instantiateTaskOutputManager(this.conf, outputContext);
+  }
+
+  // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
+  // Alternately add a config key with a classname, which is easy to initialize.
+  public void setCombiner(Processor combineProcessor) {
+    this.combineProcessor = combineProcessor;
+  }
+  
+  // TODO NEWTEZ Setup a config value for the Partitioner class, from where it can be initialized.
+  public void setPartitioner(Partitioner partitioner) {
+    this.partitioner = partitioner;
   }
 
   /**
@@ -168,42 +162,33 @@ public abstract class ExternalSorter {
     }
   }
 
-  public void setTask(RunningTaskContext task) {
-    this.runningTaskContext = task;
-    this.combineProcessor = task.getCombineProcessor();
-    this.partitioner = task.getPartitioner();
-  }
-
-  public TezTaskAttemptID getTaskAttemptId() {
-    return task.getTaskAttemptId();
-  }
-
   @Private
   public TezTaskOutput getMapOutput() {
     return mapOutputFile;
   }
 
   protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
-      Writer writer) throws IOException, InterruptedException {
-
-    CombineInput combineIn = new CombineInput(kvIter);
-    combineIn.initialize(job, runningTaskContext.getTaskReporter());
-
-    CombineOutput combineOut = new CombineOutput(writer);
-    combineOut.initialize(job, runningTaskContext.getTaskReporter());
-
-    try {
-      combineProcessor.process(new Input[] {combineIn},
-          new Output[] {combineOut});
-    } catch (IOException ioe) {
-      try {
-        combineProcessor.close();
-      } catch (IOException ignored) {}
-
-      // Do not close output here as the sorter should close the combine output
-
-      throw ioe;
-    }
+      Writer writer) throws IOException {
+
+    // TODO NEWTEZ Fix Combiner.
+//    CombineInput combineIn = new CombineInput(kvIter);
+//    combineIn.initialize(job, runningTaskContext.getTaskReporter());
+//
+//    CombineOutput combineOut = new CombineOutput(writer);
+//    combineOut.initialize(job, runningTaskContext.getTaskReporter());
+//
+//    try {
+//      combineProcessor.process(new Input[] {combineIn},
+//          new Output[] {combineOut});
+//    } catch (IOException ioe) {
+//      try {
+//        combineProcessor.close();
+//      } catch (IOException ignored) {}
+//
+//      // Do not close output here as the sorter should close the combine output
+//
+//      throw ioe;
+//    }
 
   }
 
@@ -228,10 +213,6 @@ public abstract class ExternalSorter {
     }
   }
 
-  public ExternalSorter(TezEngineTaskContext tezEngineTask) {
-    this.task = tezEngineTask;
-  }
-
   public InputStream getSortedStream(int partition) {
     throw new UnsupportedOperationException("getSortedStream isn't supported!");
   }
@@ -243,4 +224,23 @@ public abstract class ExternalSorter {
   public OutputContext getOutputContext() {
     return null;
   }
+  
+  
+
+  private TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+    Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        TezTaskOutputFiles.class);
+    try {
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+      ctor.setAccessible(true);
+      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+      return instance;
+    } catch (Exception e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate configured TezOutputFileManager: "
+              + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+                  TezTaskOutputFiles.class.getName()), e);
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 00b8958..3b39900 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -22,8 +22,6 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.DataChecksum;


Mime
View raw message