tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [4/4] git commit: TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)
Date Wed, 11 Sep 2013 04:49:49 GMT
TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cf7f197
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cf7f197
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cf7f197

Branch: refs/heads/TEZ-398
Commit: 1cf7f197dd71d5a01bb457bad1f3c79e93a3afbf
Parents: e5919fa
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Sep 10 21:49:10 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Sep 10 21:49:10 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/Constants.java   |   3 +
 .../java/org/apache/tez/common/IDUtils.java     |   3 +-
 .../org/apache/tez/common/TezJobConfig.java     |   1 +
 .../apache/tez/common/counters/TaskCounter.java |   1 +
 .../org/apache/tez/engine/api/Partitioner.java  |  35 --
 .../tez/engine/newapi/TezTaskContext.java       |  61 +++-
 .../org/apache/tez/engine/api/Partitioner.java  |  35 ++
 .../tez/engine/common/TezEngineUtils.java       |  39 +++
 .../tez/engine/common/ValuesIterator.java       | 192 +++++++++++
 .../common/localshuffle/LocalShuffle.java       |  52 ++-
 .../common/security/DelegationTokenRenewal.java | 318 -------------------
 .../common/shuffle/impl/EventFetcher.java       | 212 -------------
 .../tez/engine/common/shuffle/impl/Fetcher.java | 121 ++++---
 .../common/shuffle/impl/InMemoryReader.java     |   6 +-
 .../tez/engine/common/shuffle/impl/MapHost.java |  18 +-
 .../engine/common/shuffle/impl/MapOutput.java   |  28 +-
 .../common/shuffle/impl/MergeManager.java       | 172 +++++-----
 .../tez/engine/common/shuffle/impl/Shuffle.java | 281 ++++++++--------
 .../shuffle/impl/ShuffleClientMetrics.java      |  16 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  | 132 ++++++++
 .../common/shuffle/impl/ShuffleScheduler.java   | 194 ++++++-----
 .../shuffle/impl/TaskAttemptIdentifier.java     |  95 ++++++
 .../common/shuffle/server/ShuffleHandler.java   |  25 +-
 .../engine/common/sort/impl/ExternalSorter.java | 176 +++++-----
 .../common/sort/impl/IFileOutputStream.java     |   2 -
 .../common/sort/impl/PipelinedSorter.java       |  78 ++---
 .../tez/engine/common/sort/impl/TezMerger.java  |   1 +
 .../common/sort/impl/dflt/DefaultSorter.java    | 104 +++---
 .../sort/impl/dflt/InMemoryShuffleSorter.java   |  24 +-
 .../sort/impl/dflt/SortBufferInputStream.java   |   2 +-
 .../newoutput/TezLocalTaskOutputFiles.java      | 234 ++++++++++++++
 .../task/local/newoutput/TezTaskOutput.java     | 156 +++++++++
 .../local/newoutput/TezTaskOutputFiles.java     | 232 ++++++++++++++
 .../common/task/local/output/TezTaskOutput.java |  11 +-
 .../engine/hadoop/compat/NullProgressable.java  |  33 ++
 .../tez/engine/lib/input/LocalMergedInput.java  |  68 ++--
 .../engine/lib/input/ShuffledMergedInput.java   | 184 +++++++----
 .../engine/lib/oldinput/LocalMergedInput.java   |  67 ++++
 .../lib/oldinput/OldShuffledMergedInput.java    |  74 +++++
 .../lib/oldoutput/OldInMemorySortedOutput.java  |  58 ++++
 .../oldoutput/OldLocalOnFileSorterOutput.java   |  38 +++
 .../lib/oldoutput/OldOnFileSortedOutput.java    |  62 ++++
 .../engine/lib/output/InMemorySortedOutput.java |  71 +++--
 .../lib/output/LocalOnFileSorterOutput.java     |  17 +-
 .../engine/lib/output/OnFileSortedOutput.java   | 103 ++++--
 .../org/apache/tez/engine/newapi/KVReader.java  |  52 ++-
 .../org/apache/tez/engine/newapi/KVWriter.java  |   2 +-
 .../engine/newapi/impl/TezInputContextImpl.java |   4 +-
 .../newapi/impl/TezOutputContextImpl.java       |   3 +
 .../newapi/impl/TezProcessorContextImpl.java    |   3 +
 .../engine/newapi/impl/TezTaskContextImpl.java  |  59 +++-
 .../tez/engine/shuffle/common/ShuffleUtils.java |  56 ++++
 .../tez/mapreduce/examples/MRRSleepJob.java     |   8 +-
 .../mapreduce/examples/OrderedWordCount.java    |   8 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  20 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   6 +-
 .../processor/reduce/ReduceProcessor.java       |  24 +-
 .../processor/map/TestMapProcessor.java         | 154 +++++----
 .../processor/reduce/TestReduceProcessor.java   |   6 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   8 +-
 60 files changed, 2708 insertions(+), 1540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/Constants.java b/tez-common/src/main/java/org/apache/tez/common/Constants.java
index 9f1b20a..8ea2909 100644
--- a/tez-common/src/main/java/org/apache/tez/common/Constants.java
+++ b/tez-common/src/main/java/org/apache/tez/common/Constants.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 
 public class Constants {
 
+  // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
+  
   public static final String TEZ = "tez";
 
   public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
@@ -31,6 +33,7 @@ public class Constants {
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
   public static String MERGED_OUTPUT_PREFIX = ".merged";
   
+  // TODO NEWTEZ Remove this constant once the old code is removed.
   public static final String TEZ_ENGINE_TASK_ATTEMPT_ID = 
       "tez.engine.task.attempt.id";
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
index e94d939..1270e5a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
@@ -56,7 +56,7 @@ public class IDUtils {
     }
     throw new IllegalArgumentException(exceptionMsg);
   }
-
+  
   /** Construct a TaskAttemptID object from given string 
    * @return constructed TaskAttemptID object or null if the given String is null
    * @throws IllegalArgumentException if the given string is malformed
@@ -89,5 +89,4 @@ public class IDUtils {
     }
     throw new IllegalArgumentException(exceptionMsg);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 5a847f1..7d8730e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -157,6 +157,7 @@ public class TezJobConfig {
       "tez.engine.shuffle.use.in-memory";
   public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
 
+  // TODO NEWTEZ Remove these config parameters. Will be part of an event.
   @Private
   public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE = 
       "tez.engine.shuffle.partition-range";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 60ad1c9..b6fca27 100644
--- a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public enum TaskCounter {
+  // TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
   MAP_INPUT_RECORDS, 
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
deleted file mode 100644
index cbef463..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-/**
- * {@link Partitioner} is used by the TEZ framework to partition 
- * output key/value pairs.
- */
-public interface Partitioner {
-  
-  /**
-   * Get partition for given key/value.
-   * @param key key
-   * @param value value
-   * @param numPartitions number of partitions
-   * @return
-   */
-  int getPartition(Object key, Object value, int numPartitions);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 4cc5668..341377a 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.engine.newapi;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 
 /**
@@ -28,8 +30,18 @@ import org.apache.tez.common.counters.TezCounters;
  */
 public interface TezTaskContext {
 
+  // TODO NEWTEZ
+  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+  // on the ApplicationMaster when a thundering herd of reducers fetch events
+  // This should not be necessary after HADOOP-8942
 
   /**
+   * Get the {@link ApplicationId} for the running app
+   * @return the {@link ApplicationId}
+   */
+  public ApplicationId getApplicationId();
+  
+  /**
    * Get the index of this Task
    * @return Task Index
    */
@@ -42,12 +54,17 @@ public interface TezTaskContext {
   public int getAttemptNumber();
 
   /**
+   * Get the name of the DAG
+   * @return the DAG name
+   */
+  public String getDAGName();
+  
+  /**
    * Get the name of the Vertex in which the task is running
    * @return Vertex Name
    */
   public String getTaskVertexName();
 
-
   public TezCounters getCounters();
 
   /**
@@ -62,4 +79,46 @@ public interface TezTaskContext {
    */
   public byte[] getUserPayload();
 
+  /**
+   * Get the work diectories for the Input/Output/Processor
+   * @return an array of work dirs
+   */
+  public String[] getWorkDirs();
+  
+  /**
+   * Returns an identifier which is unique to the specific Input, Processor or
+   * Output
+   * 
+   * @return
+   */
+  public String getUniqueIdentifier();
+  
+  /**
+   * Report a fatal error to the framework. This will cause the entire task to
+   * fail and should not be used for reporting temporary or recoverable errors
+   * 
+   * @param exception an exception representing the error
+   */
+  public void fatalError(Throwable exception, String message);
+  
+  /**
+   * Returns meta-data for the specified service. As an example, when the MR
+   * ShuffleHandler is used - this would return the jobToken serialized as bytes
+   * 
+   * @param serviceName
+   *          the name of the service for which meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+  /**
+   * Return Provider meta-data for the specified service As an example, when the
+   * MR ShuffleHandler is used - this would return the shuffle port serialized
+   * as bytes
+   * 
+   * @param serviceName
+   *          the name of the service for which provider meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceProviderMetaData(String serviceName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
new file mode 100644
index 0000000..cbef463
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition 
+ * output key/value pairs.
+ */
+public interface Partitioner {
+  
+  /**
+   * Get partition for given key/value.
+   * @param key key
+   * @param value value
+   * @param numPartitions number of partitions
+   * @return
+   */
+  int getPartition(Object key, Object value, int numPartitions);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
new file mode 100644
index 0000000..b3287c9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+public class TezEngineUtils {
+
+  public static String getTaskIdentifier(String vertexName, int taskIndex) {
+    return String.format("%s_%06d", vertexName, taskIndex);
+  }
+
+  public static String getTaskAttemptIdentifier(int taskIndex,
+      int taskAttemptNumber) {
+    return String.format("%d_%d", taskIndex, taskAttemptNumber);
+  }
+
+  // TODO Maybe include a dag name in this.
+  public static String getTaskAttemptIdentifier(String vertexName,
+      int taskIndex, int taskAttemptNumber) {
+    return String.format("%s_%06d_%02d", vertexName, taskIndex,
+        taskAttemptNumber);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
new file mode 100644
index 0000000..a33d00b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterates values while keys match in sorted input.
+ * 
+ * This class is not thread safe. Accessing methods from multiple threads will
+ * lead to corrupt data.
+ * 
+ */
+public class ValuesIterator<KEY,VALUE> {
+  protected TezRawKeyValueIterator in; //input iterator
+  private KEY key;               // current key
+  private KEY nextKey;
+  private VALUE value;             // current value
+  //private boolean hasNext;                      // more w/ this key
+  private boolean more;                         // more in file
+  private RawComparator<KEY> comparator;
+  private Deserializer<KEY> keyDeserializer;
+  private Deserializer<VALUE> valDeserializer;
+  private DataInputBuffer keyIn = new DataInputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+  
+  private int keyCtr = 0;
+  private boolean hasMoreValues; // For the current key.
+  private boolean isFirstRecord = true;
+  
+  public ValuesIterator (TezRawKeyValueIterator in, 
+                         RawComparator<KEY> comparator, 
+                         Class<KEY> keyClass,
+                         Class<VALUE> valClass, Configuration conf,
+                         TezCounter inputKeyCounter,
+                         TezCounter inputValueCounter)
+    throws IOException {
+    this.in = in;
+    this.comparator = comparator;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(this.valueIn);
+  }
+
+  TezRawKeyValueIterator getRawIterator() { return in; }
+
+  /**
+   * Move to the next K-Vs pair
+   * @return true if another pair exists, otherwise false.
+   * @throws IOException 
+   */
+  public boolean moveToNext() throws IOException {
+    if (isFirstRecord) {
+      readNextKey();
+      key = nextKey;
+      nextKey = null;
+      hasMoreValues = more;
+      isFirstRecord = false;
+    } else {
+      nextKey();
+    }
+    return more;
+  }
+  
+  /** The current key. */
+  public KEY getKey() { 
+    return key; 
+  }
+  
+  public Iterable<VALUE> getValues() {
+    return new Iterable<VALUE>() {
+
+      @Override
+      public Iterator<VALUE> iterator() {
+        
+        return new Iterator<VALUE>() {
+
+          private final int keyNumber = keyCtr;
+          
+          @Override
+          public boolean hasNext() {
+            return hasMoreValues;
+          }
+
+          @Override
+          public VALUE next() {
+            if (!hasMoreValues) {
+              throw new NoSuchElementException("iterate past last value");
+            }
+            Preconditions
+                .checkState(
+                    keyNumber == keyCtr,
+                    "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
+            
+            try {
+              readNextValue();
+              readNextKey();
+            } catch (IOException ie) {
+              throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
+            }
+            inputValueCounter.increment(1);
+            return value;
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException("Cannot remove elements");
+          }
+        };
+      }
+    };
+  }
+  
+  
+
+  /** Start processing next unique key. */
+  private void nextKey() throws IOException {
+    // read until we find a new key
+    while (hasMoreValues) { 
+      readNextKey();
+    }
+    if (more) {
+      inputKeyCounter.increment(1);
+      ++keyCtr;
+    }
+    
+    // move the next key to the current one
+    KEY tmpKey = key;
+    key = nextKey;
+    nextKey = tmpKey;
+    hasMoreValues = more;
+  }
+
+  /** 
+   * read the next key - which may be the same as the current key.
+   */
+  private void readNextKey() throws IOException {
+    more = in.next();
+    if (more) {      
+      DataInputBuffer nextKeyBytes = in.getKey();
+      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+      nextKey = keyDeserializer.deserialize(nextKey);
+      hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+    } else {
+      hasMoreValues = false;
+    }
+  }
+
+  /**
+   * Read the next value
+   * @throws IOException
+   */
+  private void readNextValue() throws IOException {
+    DataInputBuffer nextValueBytes = in.getValue();
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    value = valDeserializer.deserialize(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
index e19e2c8..38b04d3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
@@ -29,25 +29,24 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 @SuppressWarnings({"rawtypes"})
 public class LocalShuffle {
 
-  private final TezEngineTaskContext taskContext;
-  private final RunningTaskContext runningTaskContext;
+  // TODO NEWTEZ This is broken.
+
+  private final TezInputContext inputContext;
   private final Configuration conf;
-  private final int tasksInDegree;
+  private final int numInputs;
 
   private final Class keyClass;
   private final Class valClass;
@@ -60,18 +59,15 @@ public class LocalShuffle {
   private final CompressionCodec codec;
   private final TezTaskOutput mapOutputFile;
 
-  public LocalShuffle(TezEngineTaskContext taskContext, 
-      RunningTaskContext runningTaskContext, 
-      Configuration conf,
-      TezTaskReporter reporter
-      ) throws IOException {
-    this.taskContext = taskContext;
-    this.runningTaskContext = runningTaskContext;
+  public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
     this.conf = conf;
+    this.numInputs = numInputs;
+    
     this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
     this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
     this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-
+    
     this.sortFactor =
         conf.getInt(
             TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
@@ -79,10 +75,9 @@ public class LocalShuffle {
     
     this.rfs = FileSystem.getLocal(conf).getRaw();
 
-    this.spilledRecordsCounter = 
-        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+    this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     
-    // compression
+ // compression
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
           ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -90,19 +85,16 @@ public class LocalShuffle {
     } else {
       this.codec = null;
     }
-
-    this.tasksInDegree = taskContext.getInputSpecList().get(0).getNumInputs();
-
+    
     // Always local
-    this.mapOutputFile = new TezLocalTaskOutputFiles();
-    this.mapOutputFile.setConf(conf);
-
+    this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
   }
+ 
   
   public TezRawKeyValueIterator run() throws IOException {
     // Copy is complete, obviously! 
-    this.runningTaskContext.getProgress().addPhase("copy").complete();
 
+    
     // Merge
     return TezMerger.merge(conf, rfs, 
         keyClass, valClass,
@@ -110,17 +102,17 @@ public class LocalShuffle {
         getMapFiles(),
         false, 
         sortFactor,
-        new Path(taskContext.getTaskAttemptId().toString()), 
+        new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken 
         comparator,
-        runningTaskContext.getTaskReporter(), spilledRecordsCounter, null, null);
+        null, spilledRecordsCounter, null, null);
   }
   
   private Path[] getMapFiles() 
   throws IOException {
     List<Path> fileList = new ArrayList<Path>();
       // for local jobs
-      for(int i = 0; i < tasksInDegree; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
+      for(int i = 0; i < numInputs; ++i) {
+        //fileList.add(mapOutputFile.getInputFile(i));
       }
       
     return fileList.toArray(new Path[0]);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
deleted file mode 100644
index a3ac968..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class DelegationTokenRenewal {
-  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
-  public static final String SCHEME = "hdfs";
-  
-  /**
-   * class that is used for keeping tracks of DT to renew
-   *
-   */
-  private static class DelegationTokenToRenew {
-    public final Token<?> token;
-    public final ApplicationId jobId;
-    public final Configuration conf;
-    public long expirationDate;
-    public TimerTask timerTask;
-    
-    public DelegationTokenToRenew(
-        ApplicationId jId, Token<?> t, 
-        Configuration newConf, long newExpirationDate) {
-      token = t;
-      jobId = jId;
-      conf = newConf;
-      expirationDate = newExpirationDate;
-      timerTask = null;
-      if(token==null || jobId==null || conf==null) {
-        throw new IllegalArgumentException("invalid params for Renew Token" +
-            ";t="+token+";j="+jobId+";c="+conf);
-      }
-    }
-    public void setTimerTask(TimerTask tTask) {
-      timerTask = tTask;
-    }
-    @Override
-    public String toString() {
-      return token + ";exp="+expirationDate;
-    }
-    @Override
-    public boolean equals (Object obj) {
-      if (obj == this) {
-        return true;
-      } else if (obj == null || getClass() != obj.getClass()) {
-        return false;
-      } else {
-        return token.equals(((DelegationTokenToRenew)obj).token);
-      }
-    }
-    @Override
-    public int hashCode() {
-      return token.hashCode();
-    }
-  }
-  
-  // global single timer (daemon)
-  private static Timer renewalTimer = new Timer(true);
-  
-  //delegation token canceler thread
-  private static DelegationTokenCancelThread dtCancelThread =
-    new DelegationTokenCancelThread();
-  static {
-    dtCancelThread.start();
-  }
-
-  
-  //managing the list of tokens using Map
-  // jobId=>List<tokens>
-  private static Set<DelegationTokenToRenew> delegationTokens = 
-    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
-  
-  private static class DelegationTokenCancelThread extends Thread {
-    private static class TokenWithConf {
-      Token<?> token;
-      Configuration conf;
-      TokenWithConf(Token<?> token, Configuration conf) {
-        this.token = token;
-        this.conf = conf;
-      }
-    }
-    private LinkedBlockingQueue<TokenWithConf> queue =  
-      new LinkedBlockingQueue<TokenWithConf>();
-     
-    public DelegationTokenCancelThread() {
-      super("Delegation Token Canceler");
-      setDaemon(true);
-    }
-    public void cancelToken(Token<?> token,  
-        Configuration conf) {
-      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
-      while (!queue.offer(tokenWithConf)) {
-        LOG.warn("Unable to add token " + token + " for cancellation. " +
-        		 "Will retry..");
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
-    public void run() {
-      while (true) {
-        TokenWithConf tokenWithConf = null;
-        try {
-          tokenWithConf = queue.take();
-          final TokenWithConf current = tokenWithConf;
-          
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Canceling token " + tokenWithConf.token.getService());
-          }
-          // need to use doAs so that http can find the kerberos tgt
-          UserGroupInformation.getLoginUser().doAs(
-              new PrivilegedExceptionAction<Void>() {
-
-                @Override
-                public Void run() throws Exception {
-                  current.token.cancel(current.conf);
-                  return null;
-                }
-              });
-        } catch (IOException e) {
-          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
-              StringUtils.stringifyException(e));
-        } catch (InterruptedException ie) {
-          return;
-        } catch (Throwable t) {
-          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
-                   ". Exiting..");
-          System.exit(-1);
-        }
-      }
-    }
-  }
-  //adding token
-  private static void addTokenToList(DelegationTokenToRenew t) {
-    delegationTokens.add(t);
-  }
-  
-  public static synchronized void registerDelegationTokensForRenewal(
-      ApplicationId jobId, Credentials ts, Configuration conf) throws IOException {
-    if(ts==null)
-      return; //nothing to add
-    
-    Collection <Token<?>> tokens = ts.getAllTokens();
-    long now = System.currentTimeMillis();
-
-    for (Token<?> t : tokens) {
-      // first renew happens immediately
-      if (t.isManaged()) {
-        DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
-            now);
-
-        addTokenToList(dtr);
-
-        setTimerForTokenRenewal(dtr, true);
-        LOG.info("registering token for renewal for service =" + t.getService()
-            + " and jobID = " + jobId);
-      }
-    }
-  }
-    
-  /**
-   * Task - to renew a token
-   *
-   */
-  private static class RenewalTimerTask extends TimerTask {
-    private DelegationTokenToRenew dttr;
-    
-    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
-    
-    @Override
-    public void run() {
-      Token<?> token = dttr.token;
-      long newExpirationDate=0;
-      try {
-        // need to use doAs so that http can find the kerberos tgt
-        dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
-            new PrivilegedExceptionAction<Long>() {
-
-              @Override
-              public Long run() throws Exception {
-                return dttr.token.renew(dttr.conf);
-              }
-            });
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("renewing for:" + token.getService() + ";newED="
-              + dttr.expirationDate);
-        }
-        setTimerForTokenRenewal(dttr, false);// set the next one
-      } catch (Exception e) {
-        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
-        removeFailedDelegationToken(dttr);
-      }
-    }
-  }
-  
-  /**
-   * find the soonest expiring token and set it for renew
-   */
-  private static void setTimerForTokenRenewal(
-      DelegationTokenToRenew token, boolean firstTime) {
-      
-    // calculate timer time
-    long now = System.currentTimeMillis();
-    long renewIn;
-    if(firstTime) {
-      renewIn = now;
-    } else {
-      long expiresIn = (token.expirationDate - now); 
-      renewIn = now + expiresIn - expiresIn/10; // little before expiration
-    }
-    
-    // need to create new timer every time
-    TimerTask tTask = new RenewalTimerTask(token);
-    token.setTimerTask(tTask); // keep reference to the timer
-
-    renewalTimer.schedule(token.timerTask, new Date(renewIn));
-  }
-
-  /**
-   * removing all tokens renewals
-   */
-  static public void close() {
-    renewalTimer.cancel();
-    delegationTokens.clear();
-  }
-  
-  // cancel a token
-  private static void cancelToken(DelegationTokenToRenew t) {
-    dtCancelThread.cancelToken(t.token, t.conf);
-  }
-  
-  /**
-   * removing failed DT
-   * @param jobId
-   */
-  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
-    ApplicationId jobId = t.jobId;
-    if (LOG.isDebugEnabled())
-      LOG.debug("removing failed delegation token for jobid=" + jobId + 
-          ";t=" + t.token.getService());
-    delegationTokens.remove(t);
-    // cancel the timer
-    if(t.timerTask!=null)
-      t.timerTask.cancel();
-  }
-  
-  /**
-   * removing DT for completed jobs
-   * @param jobId
-   */
-  public static void removeDelegationTokenRenewalForJob(ApplicationId jobId) {
-    synchronized (delegationTokens) {
-      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
-      while(it.hasNext()) {
-        DelegationTokenToRenew dttr = it.next();
-        if (dttr.jobId.equals(jobId)) {
-          if (LOG.isDebugEnabled())
-            LOG.debug("removing delegation token for jobid=" + jobId + 
-                ";t=" + dttr.token.getService());
-
-          // cancel the timer
-          if(dttr.timerTask!=null)
-            dttr.timerTask.cancel();
-
-          // cancel the token
-          cancelToken(dttr);
-
-          it.remove();
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
deleted file mode 100644
index 51e05af..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-class EventFetcher extends Thread {
-  private static final long SLEEP_TIME = 1000;
-  private static final int MAX_RETRIES = 10;
-  private static final int RETRY_PERIOD = 5000;
-  private static final Log LOG = LogFactory.getLog(EventFetcher.class);
-
-  private final TezTaskAttemptID reduce;
-  private final Master umbilical;
-  private final ShuffleScheduler scheduler;
-  private int fromEventIdx = 0;
-  private int maxEventsToFetch;
-  private Shuffle shuffle = null;
-  
-  private int maxMapRuntime = 0;
-
-  private volatile boolean stopped = false;
-  
-  public EventFetcher(TezTaskAttemptID reduce,
-                      Master umbilical,
-                      ShuffleScheduler scheduler,
-                      Shuffle shuffle,
-                      int maxEventsToFetch) {
-    setName("EventFetcher for fetching Map Completion Events");
-    setDaemon(true);    
-    this.reduce = reduce;
-    this.umbilical = umbilical;
-    this.scheduler = scheduler;
-    this.shuffle = shuffle;
-    this.maxEventsToFetch = maxEventsToFetch;
-  }
-
-  @Override
-  public void run() {
-    int failures = 0;
-    LOG.info(reduce + " Thread started: " + getName());
-    
-    try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        try {
-          int numNewMaps = getMapCompletionEvents();
-          failures = 0;
-          if (numNewMaps > 0) {
-            LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
-          }
-          LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
-          if (!Thread.currentThread().isInterrupted()) {
-            Thread.sleep(SLEEP_TIME);
-          }
-        } catch (InterruptedException e) {
-          LOG.info("EventFetcher is interrupted.. Returning");
-          return;
-        } catch (IOException ie) {
-          LOG.info("Exception in getting events", ie);
-          // check to see whether to abort
-          if (++failures >= MAX_RETRIES) {
-            throw new IOException("too many failures downloading events", ie);
-          }
-          // sleep for a bit
-          if (!Thread.currentThread().isInterrupted()) {
-            Thread.sleep(RETRY_PERIOD);
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      return;
-    } catch (Throwable t) {
-      shuffle.reportException(t);
-      return;
-    }
-  }
-
-  public void shutDown() {
-    this.stopped = true;
-    interrupt();
-    try {
-      join(5000);
-    } catch(InterruptedException ie) {
-      LOG.warn("Got interrupted while joining " + getName(), ie);
-    }
-  }
-  
-  /** 
-   * Queries the {@link TaskTracker} for a set of map-completion events 
-   * from a given event ID.
-   * @throws IOException
-   */  
-  protected int getMapCompletionEvents() throws IOException {
-    
-    int numNewMaps = 0;
-    TezDependentTaskCompletionEvent events[] = null;
-
-    do {
-      TezTaskDependencyCompletionEventsUpdate update =
-          umbilical.getDependentTasksCompletionEvents(
-              fromEventIdx,
-              maxEventsToFetch,
-              reduce);
-      events = update.getDependentTaskCompletionEvents();
-      LOG.debug("Got " + events.length + " map completion events from " +
-               fromEventIdx);
-      // Check if the reset is required.
-      // Since there is no ordering of the task completion events at the
-      // reducer, the only option to sync with the new jobtracker is to reset
-      // the events index
-      if (update.shouldReset()) {
-        fromEventIdx = 0;
-        scheduler.resetKnownMaps();
-      }
-
-      // Update the last seen event ID
-      fromEventIdx += events.length;
-
-      // Process the TaskCompletionEvents:
-      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
-      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
-      //    fetching from those maps.
-      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
-      //    outputs at all.
-      for (TezDependentTaskCompletionEvent event : events) {
-        byte[] userPayload = event.getUserPayload();
-        if(userPayload != null) {
-          shuffle.updateUserPayload(userPayload);
-        }
-        switch (event.getStatus()) {
-        case SUCCEEDED:
-          addMapHosts(event);
-          numNewMaps ++;
-          int duration = event.getTaskRunTime();
-          if (duration > maxMapRuntime) {
-            maxMapRuntime = duration;
-            scheduler.informMaxMapRunTime(maxMapRuntime);
-          }
-          break;
-        case FAILED:
-        case KILLED:
-        case OBSOLETE:
-          scheduler.obsoleteMapOutput(event.getTaskAttemptID());
-          LOG.info("Ignoring obsolete output of " + event.getStatus() + 
-              " map-task: '" + event.getTaskAttemptID() + "'");
-          break;
-        case TIPFAILED:
-          scheduler.tipFailed(event.getTaskAttemptID().getTaskID());
-          LOG.info("Ignoring output of failed map TIP: '" +  
-              event.getTaskAttemptID() + "'");
-          break;
-        }
-      }
-    } while (events.length == maxEventsToFetch);
-
-    return numNewMaps;
-  }
-  
-  private void addMapHosts(TezDependentTaskCompletionEvent event) {
-    int reduceRange = shuffle.getReduceRange();
-    for(int i=0; i<reduceRange; ++i) {
-      int partitionId = reduce.getTaskID().getId()+i;
-      URI u = getBaseURI(event.getTaskTrackerHttp(), partitionId);
-      scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
-          partitionId,
-          u.toString(),
-          event.getTaskAttemptID());
-    }
-  }
-  
-  private URI getBaseURI(String url, int reduceId) {
-    StringBuffer baseUrl = new StringBuffer(url);
-    if (!url.endsWith("/")) {
-      baseUrl.append("/");
-    }
-    baseUrl.append("mapOutput?job=");
-    // TODO TEZ HACK to get shuffle working. ApplicationId vs JobId shuffle handler.
-    // FIXME dag or application or ???
-    String jobID = reduce.getTaskID().getVertexID().getDAGId().
-        getApplicationId().toString().replace("application", "job");
-
-    baseUrl.append(jobID);
-    baseUrl.append("&reduce=");
-    baseUrl.append(reduceId);
-    baseUrl.append("&map=");
-    URI u = URI.create(baseUrl.toString());
-    return u;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 0acceaf..86e5b56 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -43,17 +43,16 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.IDUtils;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.IFileInputStream;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -63,8 +62,7 @@ class Fetcher extends Thread {
   
   /** Basic/unit connection timeout (in milliseconds) */
   private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-  
-  private final Progressable reporter;
+
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -99,27 +97,28 @@ class Fetcher extends Thread {
 
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
-      TezTaskReporter reporter, ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret) {
+      ShuffleClientMetrics metrics,
+      Shuffle shuffle, TezInputContext inputContext) throws IOException {
     this.job = job;
-    this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
     this.metrics = metrics;
     this.shuffle = shuffle;
     this.id = ++nextId;
-    this.jobTokenSecret = jobTokenSecret;
-    ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    this.jobTokenSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
-    wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_LENGTH.toString());
-    badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.BAD_ID.toString());
-    wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_MAP.toString());
-    connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.CONNECTION.toString());
-    wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
 
     if (ConfigUtils.isIntermediateInputCompressed(job)) {
@@ -156,6 +155,7 @@ class Fetcher extends Thread {
       }
     }
   }
+  
   public void run() {
     try {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
@@ -221,28 +221,28 @@ class Fetcher extends Thread {
   @VisibleForTesting
   protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
-    List<TezTaskAttemptID> maps = scheduler.getMapsForHost(host);
+    List<TaskAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
     
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
-    if (maps.size() == 0) {
+    if (srcAttempts.size() == 0) {
       return;
     }
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + maps);
+        + srcAttempts);
     }
     
     // List of maps to be fetched yet
-    Set<TezTaskAttemptID> remaining = new HashSet<TezTaskAttemptID>(maps);
+    Set<TaskAttemptIdentifier> remaining = new HashSet<TaskAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
     DataInputStream input;
     boolean connectSucceeded = false;
     
     try {
-      URL url = getMapOutputURL(host, maps);
+      URL url = getMapOutputURL(host, srcAttempts);
       HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
@@ -294,19 +294,19 @@ class Fetcher extends Thread {
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
       if (!connectSucceeded) {
-        for(TezTaskAttemptID left: remaining) {
+        for(TaskAttemptIdentifier left: remaining) {
           scheduler.copyFailed(left, host, connectSucceeded);
         }
       } else {
         // If we got a read error at this stage, it implies there was a problem
         // with the first map, typically lost map. So, penalize only that map
         // and add the rest
-        TezTaskAttemptID firstMap = maps.get(0);
+        TaskAttemptIdentifier firstMap = srcAttempts.get(0);
         scheduler.copyFailed(firstMap, host, connectSucceeded);
       }
       
       // Add back all the remaining maps, WITHOUT marking them as failed
-      for(TezTaskAttemptID left: remaining) {
+      for(TaskAttemptIdentifier left: remaining) {
         scheduler.putBackKnownMapOutput(host, left);
       }
       
@@ -318,14 +318,14 @@ class Fetcher extends Thread {
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 
       // yet_to_be_fetched list and marking the failed tasks.
-      TezTaskAttemptID[] failedTasks = null;
+      TaskAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
         failedTasks = copyMapOutput(host, input, remaining);
       }
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(TezTaskAttemptID left: failedTasks) {
+        for(TaskAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true);
         }
       }
@@ -338,19 +338,19 @@ class Fetcher extends Thread {
             + remaining.size() + " left.");
       }
     } finally {
-      for (TezTaskAttemptID left : remaining) {
+      for (TaskAttemptIdentifier left : remaining) {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
   }
   
-  private static TezTaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TezTaskAttemptID[0];
+  private static TaskAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptIdentifier[0];
   
-  private TezTaskAttemptID[] copyMapOutput(MapHost host,
+  private TaskAttemptIdentifier[] copyMapOutput(MapHost host,
                                 DataInputStream input,
-                                Set<TezTaskAttemptID> remaining) {
+                                Set<TaskAttemptIdentifier> remaining) {
     MapOutput mapOutput = null;
-    TezTaskAttemptID mapId = null;
+    TaskAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
     long compressedLength = -1;
     
@@ -361,7 +361,8 @@ class Fetcher extends Thread {
       try {
         ShuffleHeader header = new ShuffleHeader();
         header.readFields(input);
-        mapId = IDUtils.toTaskAttemptId(header.mapId);
+        String pathComponent = header.mapId;
+        srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
         compressedLength = header.compressedLength;
         decompressedLength = header.uncompressedLength;
         forReduce = header.forReduce;
@@ -369,23 +370,23 @@ class Fetcher extends Thread {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
         //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+        return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
-          remaining, mapId)) {
-        return new TezTaskAttemptID[] {mapId};
+          remaining, srcAttemptId)) {
+        return new TaskAttemptIdentifier[] {srcAttemptId};
       }
       
       if(LOG.isDebugEnabled()) {
-        LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
             ", decomp len: " + decompressedLength);
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
@@ -396,7 +397,7 @@ class Fetcher extends Thread {
       
       // Go!
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getMapId() + " decomp: " +
+               mapOutput.getAttemptIdentifier() + " decomp: " +
                decompressedLength + " len: " + compressedLength + " to " +
                mapOutput.getType());
       if (mapOutput.getType() == Type.MEMORY) {
@@ -408,32 +409,32 @@ class Fetcher extends Thread {
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
-      scheduler.copySucceeded(mapId, host, compressedLength, 
+      scheduler.copySucceeded(srcAttemptId, host, compressedLength, 
                               endTime - startTime, mapOutput);
       // Note successful shuffle
-      remaining.remove(mapId);
+      remaining.remove(srcAttemptId);
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
       ioErrs.increment(1);
-      if (mapId == null || mapOutput == null) {
+      if (srcAttemptId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
-                 mapId + " decomp: " + 
+                 srcAttemptId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        if(mapId == null) {
-          return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+        if(srcAttemptId == null) {
+          return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
         } else {
-          return new TezTaskAttemptID[] {mapId};
+          return new TaskAttemptIdentifier[] {srcAttemptId};
         }
       }
       
-      LOG.warn("Failed to shuffle output of " + mapId + 
+      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
       metrics.failedFetch();
-      return new TezTaskAttemptID[] {mapId};
+      return new TaskAttemptIdentifier[] {srcAttemptId};
     }
 
   }
@@ -448,11 +449,11 @@ class Fetcher extends Thread {
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, Set<TezTaskAttemptID> remaining, TezTaskAttemptID mapId) {
+      int forReduce, Set<TaskAttemptIdentifier> remaining, TaskAttemptIdentifier srcAttemptId) {
     if (compressedLength < 0 || decompressedLength < 0) {
       wrongLengthErrs.increment(1);
       LOG.warn(getName() + " invalid lengths in map output header: id: " +
-               mapId + " len: " + compressedLength + ", decomp len: " + 
+          srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
                decompressedLength);
       return false;
     }
@@ -462,15 +463,15 @@ class Fetcher extends Thread {
     if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
       wrongReduceErrs.increment(1);
       LOG.warn(getName() + " data for the wrong reduce map: " +
-               mapId + " len: " + compressedLength + " decomp len: " +
+               srcAttemptId + " len: " + compressedLength + " decomp len: " +
                decompressedLength + " for reduce " + forReduce);
       return false;
     }
 
     // Sanity check
-    if (!remaining.contains(mapId)) {
+    if (!remaining.contains(srcAttemptId)) {
       wrongMapErrs.increment(1);
-      LOG.warn("Invalid map-output! Received output for " + mapId);
+      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
       return false;
     }
     
@@ -485,17 +486,17 @@ class Fetcher extends Thread {
    * @return
    * @throws MalformedURLException
    */
-  private URL getMapOutputURL(MapHost host, List<TezTaskAttemptID> maps
+  private URL getMapOutputURL(MapHost host, List<TaskAttemptIdentifier> srcAttempts
                               )  throws MalformedURLException {
     // Get the base url
     StringBuffer url = new StringBuffer(host.getBaseUrl());
     
     boolean first = true;
-    for (TezTaskAttemptID mapId : maps) {
+    for (TaskAttemptIdentifier mapId : srcAttempts) {
       if (!first) {
         url.append(",");
       }
-      url.append(mapId);
+      url.append(mapId.getPathComponent());
       first = false;
     }
    
@@ -566,9 +567,8 @@ class Fetcher extends Thread {
     try {
       IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
       metrics.inputBytes(shuffleData.length);
-      reporter.progress();
       LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
-               mapOutput.getMapId());
+               mapOutput.getAttemptIdentifier());
     } catch (IOException ioe) {      
       // Close the streams
       IOUtils.cleanup(LOG, input);
@@ -593,17 +593,16 @@ class Fetcher extends Thread {
         int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
-                                mapOutput.getMapId());
+                                mapOutput.getAttemptIdentifier());
         }
         output.write(buf, 0, n);
         bytesLeft -= n;
         metrics.inputBytes(n);
-        reporter.progress();
       }
 
       LOG.info("Read " + (compressedLength - bytesLeft) + 
                " bytes from map-output for " +
-               mapOutput.getMapId());
+               mapOutput.getAttemptIdentifier());
 
       output.close();
     } catch (IOException ioe) {
@@ -617,7 +616,7 @@ class Fetcher extends Thread {
     // Sanity check
     if (bytesLeft != 0) {
       throw new IOException("Incomplete map output received for " +
-                            mapOutput.getMapId() + " from " +
+                            mapOutput.getAttemptIdentifier() + " from " +
                             host.getHostName() + " (" + 
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
index 7cea558..d10ebaa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
@@ -25,8 +25,6 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 
@@ -36,14 +34,14 @@ import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
-  private final TezTaskAttemptID taskAttemptId;
+  private final TaskAttemptIdentifier taskAttemptId;
   private final MergeManager merger;
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
   private int prevKeyPos;
 
-  public InMemoryReader(MergeManager merger, TezTaskAttemptID taskAttemptId,
+  public InMemoryReader(MergeManager merger, TaskAttemptIdentifier taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
     super(null, null, length - start, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
index 24f7635..cd644de 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
@@ -20,9 +20,6 @@ package org.apache.tez.engine.common.shuffle.impl;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
 class MapHost {
   
   public static enum State {
@@ -37,7 +34,8 @@ class MapHost {
   private final int partitionId;
   private final String baseUrl;
   private final String identifier;
-  private List<TezTaskAttemptID> maps = new ArrayList<TezTaskAttemptID>();
+  // Tracks attempt IDs
+  private List<TaskAttemptIdentifier> maps = new ArrayList<TaskAttemptIdentifier>();
   
   public MapHost(int partitionId, String hostName, String baseUrl) {
     this.partitionId = partitionId;
@@ -70,16 +68,16 @@ class MapHost {
     return baseUrl;
   }
 
-  public synchronized void addKnownMap(TezTaskAttemptID mapId) {
-    maps.add(mapId);
+  public synchronized void addKnownMap(TaskAttemptIdentifier srcAttempt) {
+    maps.add(srcAttempt);
     if (state == State.IDLE) {
       state = State.PENDING;
     }
   }
-  
-  public synchronized List<TezTaskAttemptID> getAndClearKnownMaps() {
-    List<TezTaskAttemptID> currentKnownMaps = maps;
-    maps = new ArrayList<TezTaskAttemptID>();
+
+  public synchronized List<TaskAttemptIdentifier> getAndClearKnownMaps() {
+    List<TaskAttemptIdentifier> currentKnownMaps = maps;
+    maps = new ArrayList<TaskAttemptIdentifier>();
     return currentKnownMaps;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index 272709e..f0b48fd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+
 
 class MapOutput {
   private static final Log LOG = LogFactory.getLog(MapOutput.class);
@@ -42,10 +42,10 @@ class MapOutput {
     DISK
   }
   
+  private TaskAttemptIdentifier attemptIdentifier;
   private final int id;
   
   private final MergeManager merger;
-  private final TezTaskAttemptID mapId;
   
   private final long size;
   
@@ -61,13 +61,13 @@ class MapOutput {
   
   private final boolean primaryMapOutput;
   
-  MapOutput(TezTaskAttemptID mapId, MergeManager merger, long size, 
+  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
             Configuration conf, LocalDirAllocator localDirAllocator,
             int fetcher, boolean primaryMapOutput, 
             TezTaskOutputFiles mapOutputFile)
          throws IOException {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
+    this.attemptIdentifier = attemptIdentifier;
     this.merger = merger;
 
     type = Type.DISK;
@@ -79,7 +79,7 @@ class MapOutput {
     
     this.localFS = FileSystem.getLocal(conf);
     outputPath =
-      mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getTaskIndex(), size);
     tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
 
     disk = localFS.create(tmpOutputPath);
@@ -87,10 +87,10 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  MapOutput(TezTaskAttemptID mapId, MergeManager merger, int size, 
+  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
+    this.attemptIdentifier = attemptIdentifier;
     this.merger = merger;
 
     type = Type.MEMORY;
@@ -107,10 +107,10 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
 
-  public MapOutput(TezTaskAttemptID mapId) {
+  public MapOutput(TaskAttemptIdentifier attemptIdentifier) {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    
+    this.attemptIdentifier = attemptIdentifier;
+
     type = Type.WAIT;
     merger = null;
     memory = null;
@@ -159,8 +159,8 @@ class MapOutput {
     return disk;
   }
 
-  public TezTaskAttemptID getMapId() {
-    return mapId;
+  public TaskAttemptIdentifier getAttemptIdentifier() {
+    return this.attemptIdentifier;
   }
 
   public Type getType() {
@@ -198,7 +198,7 @@ class MapOutput {
   }
   
   public String toString() {
-    return "MapOutput(" + mapId + ", " + type + ")";
+    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
   }
   
   public static class MapOutputComparator 


Mime
View raw message