tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-1446. Move the fetch code for local disk fetch from data movement event handlers to fetcher. Contributed by Prakash Ramachandran.
Date Fri, 22 Aug 2014 21:32:19 GMT
TEZ-1446. Move the fetch code for local disk fetch from data movement
event handlers to fetcher. Contributed by Prakash Ramachandran.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1651c1be
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1651c1be
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1651c1be

Branch: refs/heads/branch-0.5
Commit: 1651c1be55641975df44b7f201fa0c2b40e35c7b
Parents: f32db0e
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Aug 22 14:31:12 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 22 14:32:06 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/common/counters/TaskCounter.java |   5 +
 .../java/org/apache/hadoop/io/FileChunk.java    |  97 +++++++++
 .../org/apache/hadoop/io/FileChunkPath.java     | 104 ----------
 .../library/common/shuffle/impl/Fetcher.java    | 132 ++++++++++--
 .../library/common/shuffle/impl/MapOutput.java  | 162 +++++++--------
 .../common/shuffle/impl/MergeManager.java       | 100 +++++-----
 .../library/common/shuffle/impl/Shuffle.java    |  12 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  |  55 +----
 .../common/shuffle/impl/ShuffleScheduler.java   |   7 +-
 .../runtime/library/input/UnorderedKVInput.java |   2 +-
 .../library/shuffle/common/FetchedInput.java    |   1 +
 .../runtime/library/shuffle/common/Fetcher.java | 181 ++++++++++++++---
 .../shuffle/common/LocalDiskFetchedInput.java   |  19 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  64 +-----
 .../shuffle/common/impl/ShuffleManager.java     |  24 ++-
 .../common/shuffle/impl/TestFetcher.java        | 161 +++++++++++++++
 .../impl/TestShuffleInputEventHandler.java      |   4 +-
 .../library/shuffle/common/TestFetcher.java     | 199 +++++++++++++++++++
 .../impl/TestShuffleInputEventHandlerImpl.java  |  12 +-
 19 files changed, 914 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index e89ed44..22d7f59 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -159,6 +159,11 @@ public enum TaskCounter {
   SHUFFLE_BYTES_TO_DISK,
 
   /**
+   * Number of bytes which were read directly from local disk
+   */
+  SHUFFLE_BYTES_DISK_DIRECT,
+
+  /**
    * Number of Memory to Disk merges performed during sort-merge.
    * Used by ShuffledMergedInput
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
new file mode 100644
index 0000000..a7eb90a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.Path;
+
+@Private
+public class FileChunk implements Comparable<FileChunk> {
+
+  private final long offset;
+  private final long length;
+  private final boolean preserveAfterUse;
+  private final Path path;
+
+  public FileChunk(Path path, long offset, long length, boolean preserveAfterUse) {
+    this.path = path;
+    this.offset = offset;
+    this.length = length;
+    this.preserveAfterUse = preserveAfterUse;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || this.getClass() != o.getClass()) {
+      return false;
+    }
+
+    FileChunk that = (FileChunk)o;
+    return path.equals(that.path) && (offset == that.offset) && (length == that.length);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = path.hashCode();
+    result = 31 * result + (int) (offset ^ (offset >>> 32));
+    result = 31 * result + (int) (length ^ (length >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(FileChunk that) {
+    int c = path.compareTo(that.path);
+    if (c != 0) {
+      return c;
+    }
+
+    long lc;
+    lc = offset - that.offset;
+    if (lc != 0) {
+      return lc < 0 ? -1 : 1;
+    }
+
+    lc = length - that.length;
+    if (lc != 0) {
+      return lc < 0 ? -1 : 1;
+    }
+
+    return 0;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public boolean preserveAfterUse() {
+    return preserveAfterUse;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
deleted file mode 100644
index 9104eb7..0000000
--- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.io;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
-
-@Private
-public class FileChunkPath extends Path {
-
-  private long offset = -1;
-  private long size = -1;
-  private boolean hasOffset = false;
-
-  public FileChunkPath(String pathString, long offset, long length) throws IllegalArgumentException {
-    super(pathString);
-    this.offset = offset;
-    this.size = length;
-    this.hasOffset = true;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (o == null) {
-      return false;
-    }
-
-    boolean isPathEqual = super.equals(o);
-    if (!isPathEqual || !(o instanceof FileChunkPath)) {
-      return isPathEqual;
-    }
-
-    FileChunkPath that = (FileChunkPath) o;
-
-    if (this.offset != that.offset || this.size != that.size) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    result = 31 * result + (int) (offset ^ (offset >>> 32));
-    result = 31 * result + (int) (size ^ (size >>> 32));
-    return result;
-  }
-
-  @Override
-  public int compareTo(Object o) {
-    int c;
-
-    c = super.compareTo(o);
-    if (c != 0 || !(o instanceof FileChunkPath)) {
-      return c;
-    }
-
-    long lc;
-    FileChunkPath that = (FileChunkPath)o;
-
-    if ((lc = this.offset - that.offset) != 0) {
-      return lc > 0 ? 1 : -1;
-    }
-
-    if ((lc = this.size - that.size) != 0) {
-      return lc > 0 ? 1 : -1;
-    }
-
-    return 0;
-  }
-
-  public long getOffset() {
-    return offset;
-  }
-
-  public long getLength() {
-    return size;
-  }
-
-  public boolean hasOffset() {
-    return hasOffset;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index cb0f7bf..9b009ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -30,21 +31,33 @@ import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
 class Fetcher extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
+  private final Configuration conf;
+  private final boolean localDiskFetchEnabled;
+
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -63,7 +76,7 @@ class Fetcher extends Thread {
   private final String logIdentifier;
   private static int nextId = 0;
   private int currentPartition = -1;
-  
+
   // Decompression of map-outputs
   private final CompressionCodec codec;
   private final SecretKey jobTokenSecret;
@@ -81,11 +94,11 @@ class Fetcher extends Thread {
   HttpConnectionParams httpConnectionParams;
   
   public Fetcher(HttpConnectionParams httpConnectionParams,
-      ShuffleScheduler scheduler, MergeManager merger,
-      ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret,
-      boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec,
-      InputContext inputContext) throws IOException {
+                 ShuffleScheduler scheduler, MergeManager merger,
+                 ShuffleClientMetrics metrics,
+                 Shuffle shuffle, SecretKey jobTokenSecret,
+                 boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec,
+                 InputContext inputContext, Configuration conf, boolean localDiskFetchEnabled) throws IOException {
     setDaemon(true);
     this.scheduler = scheduler;
     this.merger = merger;
@@ -114,6 +127,9 @@ class Fetcher extends Thread {
     } else {
       this.codec = null;
     }
+    this.conf = conf;
+
+    this.localDiskFetchEnabled = localDiskFetchEnabled;
 
     this.logIdentifier = "fetcher [" + TezUtilsInternal
         .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
@@ -134,8 +150,15 @@ class Fetcher extends Thread {
           host = scheduler.getHost();
           metrics.threadBusy();
 
-          // Shuffle
-          copyFromHost(host);
+          String hostPort = host.getHostIdentifier();
+          String hostname = hostPort.substring(0, hostPort.indexOf(":"));
+          if (localDiskFetchEnabled &&
+              hostname.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
+            setupLocalDiskFetch(host);
+          } else {
+            // Shuffle
+            copyFromHost(host);
+          }
         } finally {
           cleanupCurrentConnection(false);
           if (host != null) {
@@ -290,7 +313,8 @@ class Fetcher extends Thread {
     }
   }
 
-  private void putBackRemainingMapOutputs(MapHost host) {
+  @VisibleForTesting
+  protected void putBackRemainingMapOutputs(MapHost host) {
     // Cycle through remaining MapOutputs
     boolean isFirst = true;
     InputAttemptIdentifier first = null;
@@ -359,7 +383,7 @@ class Fetcher extends Thread {
       
       // Get the location for the map output - either in-memory or on-disk
       try {
-        mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
+        mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
       } catch (IOException e) {
         // Kill the reduce attempt
         ioErrs.increment(1);
@@ -440,7 +464,7 @@ class Fetcher extends Thread {
    * @param decompressedLength
    * @param forReduce
    * @param remaining
-   * @param mapId
+   * @param srcAttemptId
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
@@ -480,5 +504,89 @@ class Fetcher extends Thread {
       return null;
     }
   }
+
+  @VisibleForTesting
+  protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
+    // Get completed maps on 'host'
+    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+    currentPartition = host.getPartitionId();
+
+    // Sanity check to catch hosts with only 'OBSOLETE' maps,
+    // especially at the tail of large jobs
+    if (srcAttempts.size() == 0) {
+      return;
+    }
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch (local disk) from " + host + " for: "
+          + srcAttempts + ", partitionId: " + currentPartition);
+    }
+
+    // List of maps to be fetched yet
+    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
+
+    try {
+      final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
+      while (iter.hasNext()) {
+        InputAttemptIdentifier srcAttemptId = iter.next();
+        MapOutput mapOutput = null;
+        try {
+          long startTime = System.currentTimeMillis();
+          Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
+
+          TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
+              currentPartition);
+
+          mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
+          long endTime = System.currentTimeMillis();
+          scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
+              indexRecord.getRawLength(), (endTime - startTime), mapOutput);
+          iter.remove();
+          metrics.successFetch();
+        } catch (IOException e) {
+          if (mapOutput != null) {
+            mapOutput.abort();
+          }
+          metrics.failedFetch();
+          ioErrs.increment(1);
+          scheduler.copyFailed(srcAttemptId, host, true, false);
+          LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
+              host.getHostIdentifier(), e);
+        }
+      }
+    } finally {
+      putBackRemainingMapOutputs(host);
+    }
+
+  }
+
+  @VisibleForTesting
+  protected Path getShuffleInputFileName(String pathComponent, String suffix)
+      throws IOException {
+    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    suffix = suffix != null ? suffix : "";
+
+    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+        pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+
+    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
+  }
+
+  @VisibleForTesting
+  protected TezIndexRecord getIndexRecord(String pathComponent, int partitionId)
+      throws IOException {
+    Path indexFile = getShuffleInputFileName(pathComponent,
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    return spillRecord.getIndex(partitionId);
+  }
+
+  @VisibleForTesting
+  protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
+                                                     Path filename, TezIndexRecord indexRecord)
+      throws IOException {
+    return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+        indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
index 2e792bf..389aae9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
@@ -26,10 +26,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.FileChunkPath;
+import org.apache.hadoop.io.FileChunk;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 
@@ -44,118 +43,96 @@ class MapOutput {
     DISK,
     DISK_DIRECT
   }
-  
-  private InputAttemptIdentifier attemptIdentifier;
+
   private final int id;
-  
+  private final Type type;
+  private InputAttemptIdentifier attemptIdentifier;
+  private final long size;
+
+  private final boolean primaryMapOutput;
   private final MergeManager merger;
 
-  private final long fileOffset;
-  private final long size;
-  
+  // MEMORY
   private final byte[] memory;
   private BoundedByteArrayOutputStream byteStream;
-  
+
+  // DISK
   private final FileSystem localFS;
   private final Path tmpOutputPath;
-  private final Path outputPath;
-  private final OutputStream disk; 
-  
-  private final Type type;
-  
-  private final boolean primaryMapOutput;
-  
-  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
-            Configuration conf, LocalDirAllocator localDirAllocator,
-            int fetcher, boolean primaryMapOutput, 
-            TezTaskOutputFiles mapOutputFile)
-         throws IOException {
+  private final FileChunk outputPath;
+  private OutputStream disk;
+
+  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, MergeManager merger,
+                    long size, Path outputPath, long offset, boolean primaryMapOutput,
+                    FileSystem fs, Path tmpOutputPath) {
     this.id = ID.incrementAndGet();
+    this.type = type;
     this.attemptIdentifier = attemptIdentifier;
     this.merger = merger;
-
-    type = Type.DISK;
-
-    memory = null;
-    byteStream = null;
-
-    this.fileOffset = 0;
-    this.size = size;
-    
-    this.localFS = FileSystem.getLocal(conf);
-    outputPath =
-      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getInputIndex(), size);
-    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
-    disk = localFS.create(tmpOutputPath);
-    
     this.primaryMapOutput = primaryMapOutput;
-  }
 
-  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, Configuration conf,
-            Path path, long offset, long size)
-      throws IOException {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
+    this.localFS = fs;
+    this.size = size;
 
-    type = Type.DISK_DIRECT;
+    // Other type specific values
 
-    memory = null;
-    byteStream = null;
+    if (type == Type.MEMORY) {
+      // since we are passing an int from createMemoryMapOutput, its safe to cast to int
+      this.byteStream = new BoundedByteArrayOutputStream((int)size);
+      this.memory = byteStream.getBuffer();
+    } else {
+      this.byteStream = null;
+      this.memory = null;
+    }
 
-    this.fileOffset = offset;
-    this.size = size;
+    this.tmpOutputPath = tmpOutputPath;
+    this.disk = null;
 
-    this.localFS = FileSystem.getLocal(conf);
-    outputPath = path;
-    tmpOutputPath = null;
+    if (type == Type.DISK || type == Type.DISK_DIRECT) {
+      boolean preserve = (type == Type.DISK_DIRECT); // type disk are temp files.
+      this.outputPath = new FileChunk(outputPath, offset, size, preserve);
+    } else {
+      this.outputPath = null;
+    }
 
-    disk = null;
-    this.primaryMapOutput = true;
   }
 
-  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
-            boolean primaryMapOutput) {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
+  public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                              MergeManager merger, long size, Configuration conf,
+                                              int fetcher, boolean primaryMapOutput,
+                                              TezTaskOutputFiles mapOutputFile) throws
+      IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path outputpath = mapOutputFile.getInputFileForWrite(
+        attemptIdentifier.getInputIdentifier().getInputIndex(), size);
+    Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
+    long offset = 0;
 
-    type = Type.MEMORY;
-    byteStream = new BoundedByteArrayOutputStream(size);
-    memory = byteStream.getBuffer();
+    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset,
+        primaryMapOutput, fs, tmpOuputPath);
+    mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
 
-    this.size = size;
-    this.fileOffset = -1;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-    
-    this.primaryMapOutput = primaryMapOutput;
+    return mapOutput;
   }
 
-  public MapOutput(InputAttemptIdentifier attemptIdentifier) {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
+  public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                                   MergeManager merger, Path path,  long offset,
+                                                   long size, boolean primaryMapOutput)  {
+    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, merger, size, path, offset,
+        primaryMapOutput, null, null);
+  }
 
-    type = Type.WAIT;
-    merger = null;
-    memory = null;
-    byteStream = null;
-    
-    size = -1;
-    fileOffset = -1;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
+  public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                                MergeManager merger, int size,
+                                                boolean primaryMapOutput)  {
+    return new MapOutput(Type.MEMORY, attemptIdentifier, merger, size, null, -1, primaryMapOutput,
+        null, null);
+  }
 
-    this.primaryMapOutput = false;
+  public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
+    return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null);
   }
-  
+
   public boolean isPrimaryMapOutput() {
     return primaryMapOutput;
   }
@@ -173,7 +150,7 @@ class MapOutput {
     return id;
   }
 
-  public Path getOutputPath() {
+  public FileChunk getOutputPath() {
     return outputPath;
   }
 
@@ -205,11 +182,10 @@ class MapOutput {
     if (type == Type.MEMORY) {
       merger.closeInMemoryFile(this);
     } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath);
+      localFS.rename(tmpOutputPath, outputPath.getPath());
       merger.closeOnDiskFile(outputPath);
     } else if (type == Type.DISK_DIRECT) {
-      FileChunkPath fileChunkPath = new FileChunkPath(outputPath.toString(), fileOffset, size);
-      merger.closeOnDiskFile(fileChunkPath);
+      merger.closeOnDiskFile(outputPath);
     } else {
       throw new IOException("Cannot commit MapOutput of type WAIT!");
     }
@@ -252,9 +228,7 @@ class MapOutput {
         return -1;
       } else {
         return 1;
-      
       }
     }
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 6846463..7fd6125 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -32,12 +32,13 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.FileChunkPath;
+import org.apache.hadoop.io.FileChunk;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
@@ -88,7 +89,7 @@ public class MergeManager {
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
   private final InMemoryMerger inMemoryMerger;
   
-  private final Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  private final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
   private final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
@@ -321,22 +322,22 @@ public class MergeManager {
   }
   
   private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit); 
+    return (requestedSize < maxSingleShuffleLimit);
   }
 
-  final private MapOutput stallShuffle = new MapOutput(null);
+  final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
 
   public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
                                              long requestedSize,
+                                             long compressedLength,
                                              int fetcher
                                              ) throws IOException {
     if (!canShuffleToMemory(requestedSize)) {
       LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");
-      return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf, 
-                                localDirAllocator, fetcher, true,
-                                mapOutputFile);
+      return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
+          fetcher, true, mapOutputFile);
     }
     
     // Stall shuffle if we are above the memory limit
@@ -372,9 +373,10 @@ public class MergeManager {
    * Unconditional Reserve is used by the Memory-to-Memory thread
    */
   private synchronized MapOutput unconditionalReserve(
-      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
+      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) throws
+      IOException {
     usedMemory += requestedSize;
-    return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, 
+    return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize,
         primaryMapOutput);
   }
   
@@ -424,7 +426,7 @@ public class MergeManager {
              inMemoryMergedMapOutputs.size());
   }
   
-  public synchronized void closeOnDiskFile(Path file) {
+  public synchronized void closeOnDiskFile(FileChunk file) {
     onDiskMapOutputs.add(file);
 
     synchronized (onDiskMerger) {
@@ -460,7 +462,7 @@ public class MergeManager {
     inMemoryMergedMapOutputs.clear();
     memory.addAll(inMemoryMapOutputs);
     inMemoryMapOutputs.clear();
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
     onDiskMapOutputs.clear();
     TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
     this.finalMergeComplete = true;
@@ -575,6 +577,7 @@ public class MergeManager {
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
 
       Writer writer = null;
+      long outFileLen = 0;
       try {
         writer =
             new Writer(conf, rfs, outputPath,
@@ -608,11 +611,12 @@ public class MergeManager {
         additionalBytesWritten.increment(writer.getCompressedLength());
         writer = null;
 
-        LOG.info(inputContext.getUniqueIdentifier() +  
+        outFileLen = localFS.getFileStatus(outputPath).getLen();
+        LOG.info(inputContext.getUniqueIdentifier() +
             " Merge of the " + noInMemorySegments +
             " files in-memory complete." +
-            " Local file is " + outputPath + " of size " + 
-            localFS.getFileStatus(outputPath).getLen());
+            " Local file is " + outputPath + " of size " +
+            outFileLen);
       } catch (IOException e) { 
         //make sure that we delete the ondisk file that we created 
         //earlier when we invoked cloneFileAttributes
@@ -625,7 +629,7 @@ public class MergeManager {
       }
 
       // Note the output of the merge
-      closeOnDiskFile(outputPath);
+      closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen, false));
     }
 
   }
@@ -633,7 +637,7 @@ public class MergeManager {
   /**
    * Merges multiple on-disk segments
    */
-  private class OnDiskMerger extends MergeThread<Path> {
+  private class OnDiskMerger extends MergeThread<FileChunk> {
 
     public OnDiskMerger(MergeManager manager) {
       super(manager, ioSortFactor, exceptionReporter);
@@ -643,7 +647,7 @@ public class MergeManager {
     }
     
     @Override
-    public void merge(List<Path> inputs) throws IOException {
+    public void merge(List<FileChunk> inputs) throws IOException {
       // sanity check
       if (inputs == null || inputs.isEmpty()) {
         LOG.info("No ondisk files to merge...");
@@ -661,21 +665,12 @@ public class MergeManager {
       List<Segment> inputSegments = new ArrayList<Segment>(inputs.size());
 
       // 1. Prepare the list of files to be merged.
-      for (Path file : inputs) {
-        long offset;
-        long size;
-        boolean preserve = false;
-
-        if (file instanceof FileChunkPath) {
-          size = ((FileChunkPath)file).getLength();
-          offset = ((FileChunkPath)file).getOffset();
-          approxOutputSize += size;
-          preserve = true;
-        } else {
-          size = localFS.getFileStatus(file).getLen();
-          offset = 0;
-          approxOutputSize += size;
-        }
+      for (FileChunk fileChunk : inputs) {
+        final long offset = fileChunk.getOffset();
+        final long size = fileChunk.getLength();
+        final boolean preserve = fileChunk.preserveAfterUse();
+        final Path file = fileChunk.getPath();
+        approxOutputSize += size;
         Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
             ifileReadAheadLength, ifileBufferSize, preserve);
         inputSegments.add(segment);
@@ -687,17 +682,16 @@ public class MergeManager {
 
       // 2. Start the on-disk merge process
       Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
+        localDirAllocator.getLocalPathForWrite(inputs.get(0).getPath().toString(),
             approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
       Writer writer = 
         new Writer(conf, rfs, outputPath, 
                         (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                         (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null, null);
-      TezRawKeyValueIterator iter  = null;
       Path tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
-        iter = TezMerger.merge(conf, rfs,
+        TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inputSegments,
@@ -717,14 +711,15 @@ public class MergeManager {
         throw e;
       }
 
-      closeOnDiskFile(outputPath);
+      final long outputLen = localFS.getFileStatus(outputPath).getLen();
+      closeOnDiskFile(new FileChunk(outputPath, 0, outputLen, false));
 
       LOG.info(inputContext.getUniqueIdentifier() +
           " Finished merging " + inputs.size() + 
           " map output files on disk of total-size " + 
           approxOutputSize + "." + 
           " Local output file is " + outputPath + " of size " +
-          localFS.getFileStatus(outputPath).getLen());
+          outputLen);
     }
   }
   
@@ -795,7 +790,7 @@ public class MergeManager {
 
   private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
                                        List<MapOutput> inMemoryMapOutputs,
-                                       List<Path> onDiskMapOutputs
+                                       List<FileChunk> onDiskMapOutputs
                                        ) throws IOException {
     LOG.info("finalMerge called with " + 
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
@@ -844,8 +839,6 @@ public class MergeManager {
             keyClass, valueClass, codec, null, null);
         try {
           TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-          // add to list of final disk outputs.
-          onDiskMapOutputs.add(outputPath);
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -861,6 +854,11 @@ public class MergeManager {
             additionalBytesWritten.increment(writer.getCompressedLength());
           }
         }
+
+        final FileStatus fStatus = localFS.getFileStatus(outputPath);
+        // add to list of final disk outputs.
+        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen(), false));
+
         LOG.info("Merged " + numMemDiskSegments + " segments, " +
                  inMemToDiskBytes + " bytes to disk to satisfy " +
                  "reduce memory limit");
@@ -876,26 +874,18 @@ public class MergeManager {
     // segments on disk
     List<Segment> diskSegments = new ArrayList<Segment>();
     long onDiskBytes = inMemToDiskBytes;
-    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
-    for (Path file : onDisk) {
-      long fileLength;
-      long fileOffset;
-      boolean preserve = false;
-      if (file instanceof FileChunkPath) {
-        FileChunkPath fileChunkPath = (FileChunkPath) file;
-        fileOffset = fileChunkPath.getOffset();
-        fileLength = fileChunkPath.getLength();
-        preserve = true;
-      } else {
-        fileOffset = 0;
-        fileLength = fs.getFileStatus(file).getLen();
-      }
+    FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
+    for (FileChunk fileChunk : onDisk) {
+      final long fileLength = fileChunk.getLength();
       onDiskBytes += fileLength;
-      LOG.debug("Disk file: " + file + " Length is " + fileLength);
+      LOG.debug("Disk file: " + fileChunk.getPath() + " Length is " + fileLength);
 
+      final Path file = fileChunk.getPath();
       TezCounter counter =
           file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
 
+      final long fileOffset = fileChunk.getOffset();
+      final boolean preserve = fileChunk.preserveAfterUse();
       diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
                                    ifileReadAheadLength, ifileBufferSize, preserve, counter));
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 5507608..8214a0c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -92,6 +92,7 @@ public class Shuffle implements ExceptionReporter {
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   private final int numFetchers;
+  private final boolean localDiskFetchEnabled;
   
   private Throwable throwable = null;
   private String throwingThreadName = null;
@@ -167,6 +168,8 @@ public class Shuffle implements ExceptionReporter {
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
         TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
     TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
         TaskCounter.SHUFFLE_BYTES_TO_MEM);
     
@@ -186,6 +189,7 @@ public class Shuffle implements ExceptionReporter {
           reduceDataSizeDecompressed,
           failedShuffleCounter,
           bytesShuffedToDisk,
+          bytesShuffedToDiskDirect,
           bytesShuffedToMem);
 
     merger = new MergeManager(
@@ -206,8 +210,6 @@ public class Shuffle implements ExceptionReporter {
     eventHandler= new ShuffleInputEventHandler(
         inputContext,
         scheduler,
-        merger,
-        this.conf,
         sslShuffle);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
@@ -220,7 +222,9 @@ public class Shuffle implements ExceptionReporter {
     numFetchers = Math.min(configuredNumFetchers, numInputs);
     LOG.info("Num fetchers being started: " + numFetchers);
     fetchers = Lists.newArrayListWithCapacity(numFetchers);
-    
+    localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+
     executor = MoreExecutors.listeningDecorator(rawExecutor);
     runShuffleCallable = new RunShuffleCallable();
   }
@@ -318,7 +322,7 @@ public class Shuffle implements ExceptionReporter {
         for (int i = 0; i < numFetchers; ++i) {
           Fetcher fetcher = new Fetcher(httpConnectionParams, scheduler, merger,
             metrics, Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
-            codec, inputContext);
+            codec, inputContext, conf, localDiskFetchEnabled);
           fetchers.add(fetcher);
           fetcher.start();
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index b664ea7..bc68da9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -27,23 +27,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
@@ -57,20 +48,13 @@ public class ShuffleInputEventHandler {
   private final InputContext inputContext;
 
   private int maxMapRuntime = 0;
-  private final MergeManager merger;
-  private final Configuration conf;
   private final boolean sslShuffle;
-  private final boolean doLocalFetch;
 
   public ShuffleInputEventHandler(InputContext inputContext,
-      ShuffleScheduler scheduler, MergeManager merger, Configuration conf, boolean sslShuffle) {
+      ShuffleScheduler scheduler, boolean sslShuffle) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
-    this.merger = merger;
-    this.conf = conf;
     this.sslShuffle = sslShuffle;
-    this.doLocalFetch = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
   }
 
   public void handleEvents(List<Event> events) throws IOException {
@@ -125,21 +109,10 @@ public class ShuffleInputEventHandler {
     InputAttemptIdentifier srcAttemptIdentifier =
         new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
             shufflePayload.getPathComponent());
-    if (doLocalFetch && shufflePayload.getHost().equals(System.getenv(
-        ApplicationConstants.Environment.NM_HOST.toString()))) {
-      LOG.info("SrcAttempt: [" + srcAttemptIdentifier +
-          "] fetching input data using direct local access");
-      Path filename = getShuffleInputFileName(shufflePayload.getPathComponent(), "");
-      TezIndexRecord indexRecord = getIndexRecord(dmEvent, shufflePayload);
-      MapOutput mapOut = new MapOutput(srcAttemptIdentifier, merger, conf, filename,
-          indexRecord.getStartOffset(), indexRecord.getPartLength()); //TODO: do we need length?
-      scheduler.copySucceeded(srcAttemptIdentifier, null, indexRecord.getPartLength(),
-          indexRecord.getRawLength(), 0, mapOut);
-    } else {
-      URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-      scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
-          partitionId, baseUri.toString(), srcAttemptIdentifier);
-    }
+
+    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
+        partitionId, baseUri.toString(), srcAttemptIdentifier);
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
@@ -157,23 +130,5 @@ public class ShuffleInputEventHandler {
     return u;
   }
 
-  private Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
-    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-    suffix = suffix != null ? suffix : "";
-
-    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
-        Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
-
-    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
-  }
-
-  private TezIndexRecord getIndexRecord(DataMovementEvent dme,
-                                              DataMovementEventPayloadProto shufflePayload)
-      throws IOException {
-    Path indexFile = getShuffleInputFileName(shufflePayload.getPathComponent(),
-        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
-    return spillRecord.getIndex(dme.getSourceIndex());
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 86883dd..6cda4e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -87,6 +87,7 @@ class ShuffleScheduler {
   private final TezCounter reduceBytesDecompressed;
   private final TezCounter failedShuffleCounter;
   private final TezCounter bytesShuffledToDisk;
+  private final TezCounter bytesShuffledToDiskDirect;
   private final TezCounter bytesShuffledToMem;
 
   private final long startTime;
@@ -111,6 +112,7 @@ class ShuffleScheduler {
                           TezCounter reduceBytesDecompressed,
                           TezCounter failedShuffleCounter,
                           TezCounter bytesShuffledToDisk,
+                          TezCounter bytesShuffledToDiskDirect,
                           TezCounter bytesShuffledToMem) {
     this.inputContext = inputContext;
     this.numInputs = numberOfInputs;
@@ -124,6 +126,7 @@ class ShuffleScheduler {
     this.reduceBytesDecompressed = reduceBytesDecompressed;
     this.failedShuffleCounter = failedShuffleCounter;
     this.bytesShuffledToDisk = bytesShuffledToDisk;
+    this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
     this.bytesShuffledToMem = bytesShuffledToMem;
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
@@ -144,7 +147,7 @@ class ShuffleScheduler {
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
     
     this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
-    
+
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -171,6 +174,8 @@ class ShuffleScheduler {
         output.commit();
         if (output.getType() == Type.DISK) {
           bytesShuffledToDisk.increment(bytesCompressed);
+        } else if (output.getType() == Type.DISK_DIRECT) {
+          bytesShuffledToDiskDirect.increment(bytesCompressed);
         } else {
           bytesShuffledToMem.increment(bytesCompressed);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index d8e2c2b..87caf4c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -137,7 +137,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
           ifileReadAhead, ifileReadAheadLength, codec, inputManager);
 
       this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager,
-          inputManager, codec, ifileReadAhead, ifileReadAheadLength, conf);
+          inputManager, codec, ifileReadAhead, ifileReadAheadLength);
 
       ////// End of Initial configuration
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
index 0bb765d..fde19b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -33,6 +33,7 @@ public abstract class FetchedInput {
     WAIT, // TODO NEWTEZ Implement this, only if required.
     MEMORY,
     DISK,
+    DISK_DIRECT
   }
   
   protected static enum State {

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 7f53c1d..9cb8617 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -33,13 +33,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.crypto.SecretKey;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 
@@ -54,6 +64,7 @@ public class Fetcher implements Callable<FetchResult> {
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
 
   private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
+  private final Configuration conf;
 
   // Configurable fields.
   private CompressionCodec codec;
@@ -89,15 +100,20 @@ public class Fetcher implements Callable<FetchResult> {
   private HttpConnection httpConnection;
   private HttpConnectionParams httpConnectionParams;
 
+  private final boolean localDiskFetchEnabled;
+
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
       FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
-      String srcNameTrimmed) {
+      String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.shuffleSecret = shuffleSecret;
     this.appId = appId;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
     this.httpConnectionParams = params;
+    this.conf = conf;
+
+    this.localDiskFetchEnabled = localDiskFetchEnabled;
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
     this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
@@ -115,11 +131,40 @@ public class Fetcher implements Callable<FetchResult> {
 
     remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
 
+    HostFetchResult hostFetchResult;
+
+    if (localDiskFetchEnabled &&
+        host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
+      hostFetchResult = setupLocalDiskFetch();
+    } else {
+      hostFetchResult = doHttpFetch();
+    }
+
+    if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
+      LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
+      for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
+        fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
+      }
+    }
+
+    shutdown();
+
+    // Sanity check
+    if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
+      throw new IOException("server didn't return all expected map outputs: "
+          + remaining.size() + " left.");
+    }
+
+    return hostFetchResult.fetchResult;
+  }
+
+  @VisibleForTesting
+  protected HostFetchResult doHttpFetch() {
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-        port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+          port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
-        httpConnectionParams.getKeepAlive());
+          httpConnectionParams.getKeepAlive());
 
       httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
       httpConnection.connect();
@@ -127,21 +172,19 @@ public class Fetcher implements Callable<FetchResult> {
       // ioErrs.increment(1);
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
+      InputAttemptIdentifier[] failedFetches = null;
       if (isShutDown.get()) {
         LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
       } else {
-        for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
-            .hasNext();) {
-          fetcherCallback.fetchFailed(host, leftIter.next(), true);
-        }
+        failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
       }
-      return new FetchResult(host, port, partition, remaining);
+      return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedFetches, true);
     }
     if (isShutDown.get()) {
       // shutdown would have no effect if in the process of establishing the connection.
       shutdownInternal();
       LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
-      return new FetchResult(host, port, partition, remaining);
+      return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
     }
 
     try {
@@ -159,8 +202,8 @@ public class Fetcher implements Callable<FetchResult> {
         InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
         LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
             + " Informing ShuffleManager: ", e);
-        fetcherCallback.fetchFailed(host, firstAttempt, false);
-        return new FetchResult(host, port, partition, remaining);
+        return new HostFetchResult(new FetchResult(host, port, partition, remaining),
+            new InputAttemptIdentifier[] { firstAttempt }, false);
       }
     }
 
@@ -172,7 +215,7 @@ public class Fetcher implements Callable<FetchResult> {
       // shutdown would have no effect if in the process of establishing the connection.
       shutdownInternal();
       LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
-      return new FetchResult(host, port, partition, remaining);
+      return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
     }
     // After this point, closing the stream and connection, should cause a
     // SocketException,
@@ -187,23 +230,99 @@ public class Fetcher implements Callable<FetchResult> {
       failedInputs = fetchInputs(input);
     }
 
-    if (failedInputs != null && failedInputs.length > 0) {
-      LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
-      for (InputAttemptIdentifier left : failedInputs) {
-        fetcherCallback.fetchFailed(host, left, false);
+    return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
+        false);
+  }
+
+  @VisibleForTesting
+  protected HostFetchResult setupLocalDiskFetch() {
+
+    Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
+    while (iterator.hasNext()) {
+      InputAttemptIdentifier srcAttemptId = iterator.next();
+      //TODO: check for shutdown? - See TEZ-1480
+      long startTime = System.currentTimeMillis();
+
+      FetchedInput fetchedInput = null;
+      try {
+        TezIndexRecord idxRecord;
+        idxRecord = getTezIndexRecord(srcAttemptId);
+
+        fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
+            idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
+            getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf,
+            new FetchedInputCallback() {
+              @Override
+              public void fetchComplete(FetchedInput fetchedInput) {}
+
+              @Override
+              public void fetchFailed(FetchedInput fetchedInput) {}
+
+              @Override
+              public void freeResources(FetchedInput fetchedInput) {}
+            });
+        LOG.info("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+            + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+            + " to " + fetchedInput.getType());
+
+        long endTime = System.currentTimeMillis();
+        fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
+            idxRecord.getRawLength(), (endTime - startTime));
+        iterator.remove();
+      } catch (IOException e) {
+        LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
+            e);
+        if (fetchedInput != null) {
+          try {
+            fetchedInput.abort();
+          } catch (IOException e1) {
+            LOG.info("Failed to cleanup fetchedInput " + fetchedInput);
+          }
+        }
       }
     }
 
-    shutdown();
-
-    // Sanity check
-    if (failedInputs == null && !remaining.isEmpty()) {
-      throw new IOException("server didn't return all expected map outputs: "
-          + remaining.size() + " left.");
+    InputAttemptIdentifier[] failedFetches = null;
+    if (remaining.size() > 0) {
+      failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
     }
+    return new HostFetchResult(new FetchResult(host, port, partition, remaining),
+        failedFetches, false);
+  }
+
+  @VisibleForTesting
+  protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws
+      IOException {
+    TezIndexRecord idxRecord;
+    Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    idxRecord = spillRecord.getIndex(partition);
+    return idxRecord;
+  }
+
+  @VisibleForTesting
+  protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
+    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    suffix = suffix != null ? suffix : "";
+
+    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
+        Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
 
-    return new FetchResult(host, port, partition, remaining);
+    return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
+  }
+
+  static class HostFetchResult {
+    private final FetchResult fetchResult;
+    private final InputAttemptIdentifier[] failedInputs;
+    private final boolean connectFailed;
 
+    public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs,
+                           boolean connectFailed) {
+      this.fetchResult = fetchResult;
+      this.failedInputs = failedInputs;
+      this.connectFailed = connectFailed;
+    }
   }
 
   public void shutdown() {
@@ -299,10 +418,13 @@ public class Fetcher implements Callable<FetchResult> {
           input, (int) decompressedLength, (int) compressedLength, codec,
           ifileReadAhead, ifileReadAheadLength, LOG,
           fetchedInput.getInputAttemptIdentifier().toString());
-      } else {
+      } else if (fetchedInput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
           (host +":" +port), input, compressedLength, LOG,
           fetchedInput.getInputAttemptIdentifier().toString());
+      } else {
+        throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
+            fetchedInput);
       }
 
       // Inform the shuffle scheduler
@@ -348,8 +470,8 @@ public class Fetcher implements Callable<FetchResult> {
    * @param compressedLength
    * @param decompressedLength
    * @param fetchPartition
-   * @param remaining
-   * @param mapId
+   * @param srcAttemptId
+   * @param pathComponent
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
@@ -400,10 +522,11 @@ public class Fetcher implements Callable<FetchResult> {
     private boolean workAssigned = false;
 
     public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
-        FetchedInputAllocator inputManager, ApplicationId appId,
-        SecretKey shuffleSecret, String srcNameTrimmed) {
+                          FetchedInputAllocator inputManager, ApplicationId appId,
+                          SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
+                          boolean localDiskFetchEnabled) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
-          shuffleSecret, srcNameTrimmed);
+          shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
index f41bd21..637fe04 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.commons.logging.Log;
@@ -43,7 +44,7 @@ public class LocalDiskFetchedInput extends FetchedInput {
                                InputAttemptIdentifier inputAttemptIdentifier, Path inputFile,
                                Configuration conf, FetchedInputCallback callbackHandler)
       throws IOException {
-    super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
+    super(Type.DISK_DIRECT, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
     this.startOffset = startOffset;
     this.inputFile = inputFile;
     localFS = FileSystem.getLocal(conf);
@@ -99,4 +100,20 @@ public class LocalDiskFetchedInput extends FetchedInput {
         ", id=" + id +
         ", state=" + state + "]";
   }
+
+  @VisibleForTesting
+  protected Path getInputFile() {
+    return inputFile;
+  }
+
+  @VisibleForTesting
+  protected long getStartOffset() {
+    return startOffset;
+  }
+
+  @VisibleForTesting
+  protected FileSystem getLocalFS() {
+    return localFS;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index d1b3362..9d621e8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -26,24 +26,15 @@ import java.util.List;
 import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.common.*;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
@@ -62,23 +53,16 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
-  private final Configuration conf;
-  private final boolean doLocalFetch;
-  
-  
+
   public ShuffleInputEventHandlerImpl(InputContext inputContext,
                                       ShuffleManager shuffleManager,
                                       FetchedInputAllocator inputAllocator, CompressionCodec codec,
-                                      boolean ifileReadAhead, int ifileReadAheadLength,
-                                      Configuration conf) {
+                                      boolean ifileReadAhead, int ifileReadAheadLength) {
     this.shuffleManager = shuffleManager;
     this.inputAllocator = inputAllocator;
     this.codec = codec;
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
-    this.conf = conf;
-    this.doLocalFetch = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
   }
 
   @Override
@@ -135,16 +119,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
       shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
     } else {
-      if (doLocalFetch && shufflePayload.getHost().equals(System.getenv(
-          ApplicationConstants.Environment.NM_HOST.toString()))) {
-        LOG.info("SrcAttempt: [" + srcAttemptIdentifier +
-            "] fetching input data using direct local access");
-        FetchedInput fetchedInput = getLocalFetchDiskInput(dme, shufflePayload, srcAttemptIdentifier);
-        shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
-      } else {
-        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
-                srcAttemptIdentifier, srcIndex);
-      }
+      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+              srcAttemptIdentifier, srcIndex);
     }
 
   }
@@ -175,37 +151,5 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
   }
 
-  private Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
-    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-    suffix = suffix != null ? suffix : "";
-
-    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
-            Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
-
-    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
-  }
-
-  private FetchedInput getLocalFetchDiskInput(DataMovementEvent dme,
-                                              DataMovementEventPayloadProto shufflePayload,
-                                              InputAttemptIdentifier srcAttemptIdentifier)
-      throws IOException {
-      String pathComponent = shufflePayload.getPathComponent();
-      Path indexFile = getShuffleInputFileName(pathComponent,
-          Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-      TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
-      TezIndexRecord idxRecord = spillRecord.getIndex(dme.getSourceIndex());
-      return new LocalDiskFetchedInput(idxRecord.getStartOffset(),idxRecord.getRawLength(),
-          idxRecord.getPartLength(), srcAttemptIdentifier, getShuffleInputFileName(pathComponent, ""), conf,
-          new FetchedInputCallback() {
-            @Override
-            public void fetchComplete(FetchedInput fetchedInput) {}
-
-            @Override
-            public void fetchFailed(FetchedInput fetchedInput) {}
-
-            @Override
-            public void freeResources(FetchedInput fetchedInput) {}
-          });
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index be43dc0..e28dcd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -91,7 +91,7 @@ public class ShuffleManager implements FetcherCallback {
   private final ListeningExecutorService fetcherExecutor;
 
   private final ListeningExecutorService schedulerExecutor;
-  private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
+  private final RunShuffleCallable schedulerCallable;
   
   private final BlockingQueue<FetchedInput> completedInputs;
   private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
@@ -115,6 +115,7 @@ public class ShuffleManager implements FetcherCallback {
   // Parameters required by Fetchers
   private final SecretKey shuffleSecret;
   private final CompressionCodec codec;
+  private final boolean localDiskFetchEnabled;
   
   private final int ifileBufferSize;
   private final boolean ifileReadAhead;
@@ -130,6 +131,7 @@ public class ShuffleManager implements FetcherCallback {
   private final TezCounter decompressedDataSizeCounter;
   private final TezCounter bytesShuffledToDiskCounter;
   private final TezCounter bytesShuffledToMemCounter;
+  private final TezCounter bytesShuffledDirectDiskCounter;
   
   private volatile Throwable shuffleError;
   private final HttpConnectionParams httpConnectionParams;
@@ -148,12 +150,15 @@ public class ShuffleManager implements FetcherCallback {
     this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
     this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
     this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+    this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
   
     this.ifileBufferSize = bufferSize;
     this.ifileReadAhead = ifileReadAheadEnabled;
     this.ifileReadAheadLength = ifileReadAheadLength;
     this.codec = codec;
     this.inputManager = inputAllocator;
+    this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
@@ -180,6 +185,7 @@ public class ShuffleManager implements FetcherCallback {
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
     this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+    this.schedulerCallable = new RunShuffleCallable(conf);
     
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
@@ -207,6 +213,12 @@ public class ShuffleManager implements FetcherCallback {
   
   private class RunShuffleCallable implements Callable<Void> {
 
+    private final Configuration conf;
+
+    public RunShuffleCallable(Configuration conf) {
+      this.conf = conf;
+    }
+
     @Override
     public Void call() throws Exception {
       while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
@@ -252,7 +264,7 @@ public class ShuffleManager implements FetcherCallback {
               }
               if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
                 LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
-                Fetcher fetcher = constructFetcherForHost(inputHost);
+                Fetcher fetcher = constructFetcherForHost(inputHost, conf);
                 runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
                   LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
@@ -284,10 +296,10 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
   
-  private Fetcher constructFetcherForHost(InputHost inputHost) {
+  private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
       httpConnectionParams, inputManager, inputContext.getApplicationId(),
-      shuffleSecret, srcNameTrimmed);
+      shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
     }
@@ -450,8 +462,10 @@ public class ShuffleManager implements FetcherCallback {
           bytesShuffledCounter.increment(fetchedBytes);
           if (fetchedInput.getType() == Type.MEMORY) {
             bytesShuffledToMemCounter.increment(fetchedBytes);
-          } else {
+          } else if (fetchedInput.getType() == Type.DISK) {
             bytesShuffledToDiskCounter.increment(fetchedBytes);
+          } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
+            bytesShuffledDirectDiskCounter.increment(fetchedBytes);
           }
           decompressedDataSizeCounter.increment(decompressedLength);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1651c1be/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
new file mode 100644
index 0000000..522e152
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestFetcher {
+
+  public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
+  public static final String HOST = "localhost";
+  public static final int PORT = 0;
+
+  @Test(timeout = 5000)
+  public void testSetupLocalDiskFetch() throws Exception {
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    Shuffle shuffle = mock(Shuffle.class);
+    InputContext inputContext = mock(InputContext.class);
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    when(inputContext.getSourceVertexName()).thenReturn("");
+
+    Fetcher fetcher = new Fetcher(null, scheduler, merger, metrics, shuffle, null,
+        false, 0, null, inputContext, conf, true);
+    Fetcher spyFetcher = spy(fetcher);
+
+    MapHost host = new MapHost(1, HOST + ":" + PORT,
+        "http://" + HOST + ":" +PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
+        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
+        new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
+        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
+        new InputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
+    );
+    final int FIRST_FAILED_ATTEMPT_IDX = 2;
+    final int SECOND_FAILED_ATTEMPT_IDX = 4;
+    final int[] sucessfulAttemptsIndexes = {0, 1, 3};
+
+    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
+
+    doAnswer(new Answer<MapOutput>() {
+      @Override
+      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        MapOutput mapOutput = mock(MapOutput.class);
+        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
+        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
+        return mapOutput;
+      }
+    }).when(spyFetcher)
+        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
+            any(TezIndexRecord.class));
+
+    doAnswer(new Answer<Path>() {
+      @Override
+      public Path answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
+      }
+    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
+
+    doAnswer(new Answer<TezIndexRecord>() {
+      @Override
+      public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        String pathComponent = (String) args[0];
+        int len = pathComponent.length();
+        long p = Long.valueOf(pathComponent.substring(len - 1, len));
+        if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
+          throw new IOException("failing to simulate failure case");
+        }
+        // match with params for copySucceeded below.
+        return new TezIndexRecord(p * 10, p * 1000, p * 100);
+      }
+    }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId()));
+
+    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
+        anyLong(), anyLong(), anyLong(), any(MapOutput.class));
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
+
+    spyFetcher.setupLocalDiskFetch(host);
+
+    // should have exactly 3 success and 1 failure.
+    for (int i : sucessfulAttemptsIndexes) {
+      verifyCopySucceeded(scheduler, host, srcAttempts, i);
+    }
+    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false);
+    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false);
+
+    verify(metrics, times(3)).successFetch();
+    verify(metrics, times(2)).failedFetch();
+
+    verify(spyFetcher).putBackRemainingMapOutputs(host);
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
+  }
+
+  private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host,
+                                   List<InputAttemptIdentifier> srcAttempts, long p) throws
+      IOException {
+    // need to verify filename, offsets, sizes wherever they are used.
+    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int)p);
+    String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
+    ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
+    verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
+        eq(p * 1000), anyLong(), captureMapOutput.capture());
+
+    // cannot use the equals of MapOutput as it compares id which is private. so doing it manually
+    MapOutput m = captureMapOutput.getAllValues().get(0);
+    Assert.assertTrue(m.getType().equals(MapOutput.Type.DISK_DIRECT) &&
+            m.getAttemptIdentifier().equals(srcAttemptToMatch));
+  }
+}


Mime
View raw message