tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1343. Bypass the Fetcher and read directly from the local filesystem if source task ran on the same host. Contributed by Prakash Ramachandran.
Date Sat, 02 Aug 2014 22:22:10 GMT
Repository: tez
Updated Branches:
  refs/heads/master d3191bd39 -> 57d342cc3


TEZ-1343. Bypass the Fetcher and read directly from the local filesystem
if source task ran on the same host. Contributed by Prakash
Ramachandran.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57d342cc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57d342cc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57d342cc

Branch: refs/heads/master
Commit: 57d342cc32600956956f04fda3408f0a8e617a37
Parents: d3191bd
Author: Siddharth Seth <sseth@apache.org>
Authored: Sat Aug 2 15:21:30 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Sat Aug 2 15:21:30 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   3 +-
 .../app/launcher/LocalContainerLauncher.java    |  12 ++-
 .../org/apache/hadoop/io/FileChunkPath.java     |  96 +++++++++++++++++
 .../library/api/TezRuntimeConfiguration.java    |  10 ++
 .../library/common/shuffle/impl/Fetcher.java    |   5 +-
 .../library/common/shuffle/impl/MapOutput.java  |  41 +++++++-
 .../common/shuffle/impl/MergeManager.java       |  76 +++++++++-----
 .../library/common/shuffle/impl/Shuffle.java    |  14 ++-
 .../shuffle/impl/ShuffleInputEventHandler.java  |  65 ++++++++++--
 .../common/shuffle/impl/ShuffleScheduler.java   |   2 +-
 .../library/input/ShuffledMergedInput.java      |   3 +-
 .../library/input/ShuffledUnorderedKVInput.java |   3 +-
 .../shuffle/common/LocalDiskFetchedInput.java   | 102 +++++++++++++++++++
 .../impl/ShuffleInputEventHandlerImpl.java      |  74 +++++++++++---
 .../impl/TestShuffleInputEventHandler.java      |   9 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |  13 ++-
 16 files changed, 463 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index bfb974b..132e5fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -807,7 +808,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   protected ContainerLauncher
-      createContainerLauncher(final AppContext context) {
+      createContainerLauncher(final AppContext context) throws UnknownHostException {
     if(isLocal){
       return new LocalContainerLauncher(context, taskAttemptListener);
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 5a33a88..c24608e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -21,6 +21,9 @@ package org.apache.tez.dag.app.launcher;
 
 import java.io.IOException;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -38,6 +41,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -62,6 +66,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
 import org.apache.tez.runtime.task.TezChild;
 
 
@@ -95,10 +100,15 @@ public class LocalContainerLauncher extends AbstractService implements
 
 
   public LocalContainerLauncher(AppContext context,
-                                TaskAttemptListener taskAttemptListener) {
+                                TaskAttemptListener taskAttemptListener) throws
+      UnknownHostException {
     super(LocalContainerLauncher.class.getName());
     this.context = context;
     this.taskAttemptListener = taskAttemptListener;
+    EnvironmentUpdateUtils.put("NM_AUX_SERVICE_mapreduce_shuffle",
+        Base64.encodeBase64String(ByteBuffer.allocate(4).putInt(0).array()));
+    EnvironmentUpdateUtils.put(Environment.NM_HOST.toString(),
+        InetAddress.getLocalHost().getHostName());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
new file mode 100644
index 0000000..dddcdf5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunkPath.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.net.URI;
+
+import org.apache.hadoop.fs.Path;
+
+public class FileChunkPath extends Path {
+
+  private long offset = -1;
+  private long size = -1;
+  private boolean hasOffset = false;
+
+  public FileChunkPath(String pathString, long offset, long length) throws IllegalArgumentException {
+    super(pathString);
+    this.offset = offset;
+    this.size = length;
+    this.hasOffset = true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass() || !super.equals(o)) {
+      return false;
+    }
+
+    FileChunkPath that = (FileChunkPath) o;
+
+    if (offset != that.offset || size != that.size) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + (int) (offset ^ (offset >>> 32));
+    result = 31 * result + (int) (size ^ (size >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    FileChunkPath that = (FileChunkPath)o;
+    int c;
+
+    if ((c = super.compareTo(o)) != 0) {
+      return c;
+    }
+
+    long lc;
+    if ((lc = this.offset - that.offset) != 0) {
+      return lc > 0 ? 1 : -1;
+    }
+
+    if ((lc = this.size - that.size) != 0) {
+      return lc > 0 ? 1 : -1;
+    }
+
+    return 0;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return size;
+  }
+
+  public boolean hasOffset() {
+    return hasOffset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 99faf3d..6fdffa3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -266,6 +266,15 @@ public class TezRuntimeConfiguration {
       TEZ_RUNTIME_PREFIX + "broadcast.data-via-events.max-size";
   public static final int TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 200 << 10; // 200KB
 
+  /**
+   * If the shuffle input is on the local host bypass the http fetch and access the files directly
+   */
+  public static final String TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH = TEZ_RUNTIME_PREFIX + "optimize.local.fetch";
+
+  /**
+   * local mode bypassing the http fetch is not enabled by default till we have unit tests in.
+   */
+  public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = false;
 
   // TODO TEZ-1233 - allow this property to be set per vertex
   // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
@@ -319,6 +328,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 5d5f58e..e5913e5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -383,9 +383,12 @@ class Fetcher extends Thread {
         ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
           (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
           ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
-      } else {
+      } else if (mapOutput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
           input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+      } else {
+        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
+            mapOutput.getType());
       }
       
       // Inform the shuffle scheduler

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
index ef741ae..2e792bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.FileChunkPath;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 
@@ -40,14 +41,16 @@ class MapOutput {
   public static enum Type {
     WAIT,
     MEMORY,
-    DISK
+    DISK,
+    DISK_DIRECT
   }
   
   private InputAttemptIdentifier attemptIdentifier;
   private final int id;
   
   private final MergeManager merger;
-  
+
+  private final long fileOffset;
   private final long size;
   
   private final byte[] memory;
@@ -76,6 +79,7 @@ class MapOutput {
     memory = null;
     byteStream = null;
 
+    this.fileOffset = 0;
     this.size = size;
     
     this.localFS = FileSystem.getLocal(conf);
@@ -87,7 +91,30 @@ class MapOutput {
     
     this.primaryMapOutput = primaryMapOutput;
   }
-  
+
+  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, Configuration conf,
+            Path path, long offset, long size)
+      throws IOException {
+    this.id = ID.incrementAndGet();
+    this.attemptIdentifier = attemptIdentifier;
+    this.merger = merger;
+
+    type = Type.DISK_DIRECT;
+
+    memory = null;
+    byteStream = null;
+
+    this.fileOffset = offset;
+    this.size = size;
+
+    this.localFS = FileSystem.getLocal(conf);
+    outputPath = path;
+    tmpOutputPath = null;
+
+    disk = null;
+    this.primaryMapOutput = true;
+  }
+
   MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
@@ -99,6 +126,7 @@ class MapOutput {
     memory = byteStream.getBuffer();
 
     this.size = size;
+    this.fileOffset = -1;
     
     localFS = null;
     disk = null;
@@ -118,6 +146,7 @@ class MapOutput {
     byteStream = null;
     
     size = -1;
+    fileOffset = -1;
     
     localFS = null;
     disk = null;
@@ -125,7 +154,7 @@ class MapOutput {
     tmpOutputPath = null;
 
     this.primaryMapOutput = false;
-}
+  }
   
   public boolean isPrimaryMapOutput() {
     return primaryMapOutput;
@@ -178,6 +207,9 @@ class MapOutput {
     } else if (type == Type.DISK) {
       localFS.rename(tmpOutputPath, outputPath);
       merger.closeOnDiskFile(outputPath);
+    } else if (type == Type.DISK_DIRECT) {
+      FileChunkPath fileChunkPath = new FileChunkPath(outputPath.toString(), fileOffset, size);
+      merger.closeOnDiskFile(fileChunkPath);
     } else {
       throw new IOException("Cannot commit MapOutput of type WAIT!");
     }
@@ -192,6 +224,7 @@ class MapOutput {
       } catch (IOException ie) {
         LOG.info("failure to clean up " + tmpOutputPath, ie);
       }
+    } else if (type == Type.DISK_DIRECT) { //nothing to do.
     } else {
       throw new IllegalArgumentException
                    ("Cannot commit MapOutput with of type WAIT!");

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 0038bb4..17880e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.FileChunkPath;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
@@ -425,9 +426,9 @@ public class MergeManager {
   
   public synchronized void closeOnDiskFile(Path file) {
     onDiskMapOutputs.add(file);
-    
+
     synchronized (onDiskMerger) {
-      if (!onDiskMerger.isInProgress() && 
+      if (!onDiskMerger.isInProgress() &&
           onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
         onDiskMerger.startMerge(onDiskMapOutputs);
       }
@@ -631,7 +632,7 @@ public class MergeManager {
    * Merges multiple on-disk segments
    */
   private class OnDiskMerger extends MergeThread<Path> {
-    
+
     public OnDiskMerger(MergeManager manager) {
       super(manager, ioSortFactor, exceptionReporter);
       setName("DiskToDiskMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
@@ -653,10 +654,28 @@ public class MergeManager {
       
       LOG.info("OnDiskMerger: We have  " + inputs.size() + 
                " map outputs on disk. Triggering merge...");
-      
-      // 1. Prepare the list of files to be merged. 
+
+      List<Segment> inputSegments = new ArrayList<Segment>(inputs.size());
+
+      // 1. Prepare the list of files to be merged.
       for (Path file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file).getLen();
+        long offset;
+        long size;
+        boolean preserve = false;
+
+        if (file instanceof FileChunkPath) {
+          size = ((FileChunkPath)file).getLength();
+          offset = ((FileChunkPath)file).getOffset();
+          approxOutputSize += size;
+          preserve = true;
+        } else {
+          size = localFS.getFileStatus(file).getLen();
+          offset = 0;
+          approxOutputSize += size;
+        }
+        Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
+            ifileReadAheadLength, ifileBufferSize, preserve);
+        inputSegments.add(segment);
       }
 
       // add the checksum length
@@ -676,13 +695,13 @@ public class MergeManager {
       Path tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
         iter = TezMerger.merge(conf, rfs,
-                            (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
-                            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                            codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
-                            inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, 
-                            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
-                            nullProgressable, spilledRecordsCounter, null, 
-                            mergedMapOutputsCounter, null);
+            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            inputSegments,
+            ioSortFactor, tmpDir,
+            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+            nullProgressable, true, spilledRecordsCounter, null,
+            mergedMapOutputsCounter, null);
 
         // TODO Maybe differentiate between data written because of Merges and
         // the finalMerge (i.e. final mem available may be different from
@@ -856,15 +875,26 @@ public class MergeManager {
     long onDiskBytes = inMemToDiskBytes;
     Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
     for (Path file : onDisk) {
-      onDiskBytes += fs.getFileStatus(file).getLen();
-      LOG.debug("Disk file: " + file + " Length is " + 
-          fs.getFileStatus(file).getLen());
-      diskSegments.add(new Segment(job, fs, file, codec, ifileReadAhead,
-                                   ifileReadAheadLength, ifileBufferSize, false,
-                                         (file.toString().endsWith(
-                                             Constants.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter)
-                                        ));
+      long fileLength;
+      long fileOffset;
+      boolean preserve = false;
+      if (file instanceof FileChunkPath) {
+        FileChunkPath fileChunkPath = (FileChunkPath) file;
+        fileOffset = fileChunkPath.getOffset();
+        fileLength = fileChunkPath.getLength();
+        preserve = true;
+      } else {
+        fileOffset = 0;
+        fileLength = fs.getFileStatus(file).getLen();
+      }
+      onDiskBytes += fileLength;
+      LOG.debug("Disk file: " + file + " Length is " + fileLength);
+
+      TezCounter counter =
+          file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
+
+      diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
+                                   ifileReadAheadLength, ifileBufferSize, preserve, counter));
     }
     LOG.info("Merging " + onDisk.length + " files, " +
              onDiskBytes + " bytes from disk");
@@ -897,7 +927,7 @@ public class MergeManager {
       }
       finalSegments.add(new Segment(
             new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    } 
+    }
     // This is doing nothing but creating an iterator over the segments.
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index aff293c..78806a4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -186,10 +186,7 @@ public class Shuffle implements ExceptionReporter {
           failedShuffleCounter,
           bytesShuffedToDisk,
           bytesShuffedToMem);
-    eventHandler= new ShuffleInputEventHandler(
-      inputContext,
-      scheduler,
-      sslShuffle);
+
     merger = new MergeManager(
           this.conf,
           localFS,
@@ -204,6 +201,13 @@ public class Shuffle implements ExceptionReporter {
           codec,
           ifileReadAhead,
           ifileReadAheadLength);
+
+    eventHandler= new ShuffleInputEventHandler(
+        inputContext,
+        scheduler,
+        merger,
+        this.conf,
+        sslShuffle);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
@@ -220,7 +224,7 @@ public class Shuffle implements ExceptionReporter {
     runShuffleCallable = new RunShuffleCallable();
   }
 
-  public void handleEvents(List<Event> events) {
+  public void handleEvents(List<Event> events) throws IOException {
     if (!isShutDown.get()) {
       eventHandler.handleEvents(events);
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 6fdf65e..23616e0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -26,14 +26,23 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
@@ -47,23 +56,30 @@ public class ShuffleInputEventHandler {
   private final TezInputContext inputContext;
 
   private int maxMapRuntime = 0;
+  private final MergeManager merger;
+  private final Configuration conf;
   private final boolean sslShuffle;
+  private final boolean doLocalFetch;
 
   public ShuffleInputEventHandler(TezInputContext inputContext,
-      ShuffleScheduler scheduler, boolean sslShuffle) {
+      ShuffleScheduler scheduler, MergeManager merger, Configuration conf, boolean sslShuffle) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
+    this.merger = merger;
+    this.conf = conf;
     this.sslShuffle = sslShuffle;
+    this.doLocalFetch = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
   }
 
-  public void handleEvents(List<Event> events) {
+  public void handleEvents(List<Event> events) throws IOException {
     for (Event event : events) {
       handleEvent(event);
     }
   }
   
   
-  private void handleEvent(Event event) {
+  private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
       processDataMovementEvent((DataMovementEvent) event);      
     } else if (event instanceof InputFailedEvent) {
@@ -71,7 +87,7 @@ public class ShuffleInputEventHandler {
     }
   }
 
-  private void processDataMovementEvent(DataMovementEvent dmEvent) {
+  private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
     DataMovementEventPayloadProto shufflePayload;
     try {
       shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
@@ -104,11 +120,25 @@ public class ShuffleInputEventHandler {
                 "the empty partition to succeeded", e);
       }
     }
-    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+
     InputAttemptIdentifier srcAttemptIdentifier =
-        new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
-        partitionId, baseUri.toString(), srcAttemptIdentifier);
+        new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
+            shufflePayload.getPathComponent());
+    if (doLocalFetch && shufflePayload.getHost().equals(System.getenv(
+        ApplicationConstants.Environment.NM_HOST.toString()))) {
+      LOG.info("SrcAttempt: [" + srcAttemptIdentifier +
+          "] fetching input data using direct local access");
+      Path filename = getShuffleInputFileName(shufflePayload.getPathComponent(), "");
+      TezIndexRecord indexRecord = getIndexRecord(dmEvent, shufflePayload);
+      MapOutput mapOut = new MapOutput(srcAttemptIdentifier, merger, conf, filename,
+          indexRecord.getStartOffset(), indexRecord.getPartLength()); //TODO: do we need length?
+      scheduler.copySucceeded(srcAttemptIdentifier, null, indexRecord.getPartLength(),
+          indexRecord.getRawLength(), 0, mapOut);
+    } else {
+      URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+      scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
+          partitionId, baseUri.toString(), srcAttemptIdentifier);
+    }
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
@@ -125,5 +155,24 @@ public class ShuffleInputEventHandler {
     URI u = URI.create(sb.toString());
     return u;
   }
+
+  private Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
+    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    suffix = suffix != null ? suffix : "";
+
+    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
+        Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+
+    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
+  }
+
+  private TezIndexRecord getIndexRecord(DataMovementEvent dme,
+                                              DataMovementEventPayloadProto shufflePayload)
+      throws IOException {
+    Path indexFile = getShuffleInputFileName(shufflePayload.getPathComponent(),
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    return spillRecord.getIndex(dme.getSourceIndex());
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 3e2fa7a..db322f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -88,7 +88,7 @@ class ShuffleScheduler {
   private final TezCounter failedShuffleCounter;
   private final TezCounter bytesShuffledToDisk;
   private final TezCounter bytesShuffledToMem;
-  
+
   private final long startTime;
   private long lastProgressTime;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 01b9de7..cad67cd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -236,7 +236,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
   }
 
   @Override
-  public void handleEvents(List<Event> inputEvents) {
+  public void handleEvents(List<Event> inputEvents) throws IOException {
     synchronized (this) {
       if (getNumPhysicalInputs() == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
@@ -327,6 +327,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_MAX_KEYS);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_GROUP_NAME_MAX_KEYS);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_NAME_MAX_KEYS);

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index ff076ca..30c1d7b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -131,7 +131,7 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
           ifileReadAhead, ifileReadAheadLength, codec, inputManager);
 
       this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager,
-          inputManager, codec, ifileReadAhead, ifileReadAheadLength);
+          inputManager, codec, ifileReadAhead, ifileReadAheadLength, conf);
 
       ////// End of Initial configuration
 
@@ -240,6 +240,7 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_MAX_KEYS);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_GROUP_NAME_MAX_KEYS);
     confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_NAME_MAX_KEYS);

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
new file mode 100644
index 0000000..f41bd21
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/LocalDiskFetchedInput.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public class LocalDiskFetchedInput extends FetchedInput {
+  private static final Log LOG = LogFactory.getLog(LocalDiskFetchedInput.class);
+
+  private final Path inputFile;
+  private final FileSystem localFS;
+  private final long startOffset;
+
+  public LocalDiskFetchedInput(long startOffset, long actualSize, long compressedSize,
+                               InputAttemptIdentifier inputAttemptIdentifier, Path inputFile,
+                               Configuration conf, FetchedInputCallback callbackHandler)
+      throws IOException {
+    super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
+    this.startOffset = startOffset;
+    this.inputFile = inputFile;
+    localFS = FileSystem.getLocal(conf);
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+      throw new IOException("Output Stream is not supported for " + this.toString());
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    FSDataInputStream inputStream = localFS.open(inputFile);
+    inputStream.seek(startOffset);
+    return new BoundedInputStream(inputStream, compressedSize);
+  }
+
+  @Override
+  public void commit() {
+    if (state == State.PENDING) {
+      state = State.COMMITTED;
+      notifyFetchComplete();
+    }
+  }
+
+  @Override
+  public void abort() {
+    if (state == State.PENDING) {
+      state = State.ABORTED;
+      notifyFetchFailure();
+    }
+  }
+  
+  @Override
+  public void free() {
+    Preconditions.checkState(
+        state == State.COMMITTED || state == State.ABORTED,
+        "FetchedInput can only be freed after it is committed or aborted");
+    if (state == State.COMMITTED) { // ABORTED would have already called cleanup
+      state = State.FREED;
+      notifyFreedResource();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "LocalDiskFetchedInput [inputFile path =" + inputFile +
+        ", offset" + startOffset +
+        ", actualSize=" + actualSize +
+        ", compressedSize=" + compressedSize +
+        ", inputAttemptIdentifier=" + inputAttemptIdentifier +
+        ", type=" + type +
+        ", id=" + id +
+        ", state=" + state + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 83d9502..d93608c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -25,21 +25,25 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.shuffle.common.*;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 
@@ -57,17 +61,23 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
+  private final Configuration conf;
+  private final boolean doLocalFetch;
   
   
   public ShuffleInputEventHandlerImpl(TezInputContext inputContext,
-      ShuffleManager shuffleManager,
-      FetchedInputAllocator inputAllocator, CompressionCodec codec,
-      boolean ifileReadAhead, int ifileReadAheadLength) {
+                                      ShuffleManager shuffleManager,
+                                      FetchedInputAllocator inputAllocator, CompressionCodec codec,
+                                      boolean ifileReadAhead, int ifileReadAheadLength,
+                                      Configuration conf) {
     this.shuffleManager = shuffleManager;
     this.inputAllocator = inputAllocator;
     this.codec = codec;
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
+    this.conf = conf;
+    this.doLocalFetch = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
   }
 
   @Override
@@ -123,12 +133,20 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
       shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
     } else {
-      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
-          srcAttemptIdentifier, srcIndex);
+      if (doLocalFetch && shufflePayload.getHost().equals(System.getenv(
+          ApplicationConstants.Environment.NM_HOST.toString()))) {
+        LOG.info("SrcAttempt: [" + srcAttemptIdentifier +
+            "] fetching input data using direct local access");
+        FetchedInput fetchedInput = getLocalFetchDiskInput(dme, shufflePayload, srcAttemptIdentifier);
+        shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+      } else {
+        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+                srcAttemptIdentifier, srcIndex);
+      }
     }
 
   }
-  
+
   private void moveDataToFetchedInput(DataProto dataProto,
       FetchedInput fetchedInput, String hostIdentifier) throws IOException {
     switch (fetchedInput.getType()) {
@@ -155,5 +173,37 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
   }
 
+  private Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
+    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    suffix = suffix != null ? suffix : "";
+
+    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
+            Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+
+    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
+  }
+
+  private FetchedInput getLocalFetchDiskInput(DataMovementEvent dme,
+                                              DataMovementEventPayloadProto shufflePayload,
+                                              InputAttemptIdentifier srcAttemptIdentifier)
+      throws IOException {
+      String pathComponent = shufflePayload.getPathComponent();
+      Path indexFile = getShuffleInputFileName(pathComponent,
+          Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+      TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+      TezIndexRecord idxRecord = spillRecord.getIndex(dme.getSourceIndex());
+      return new LocalDiskFetchedInput(idxRecord.getStartOffset(),idxRecord.getRawLength(),
+          idxRecord.getPartLength(), srcAttemptIdentifier, getShuffleInputFileName(pathComponent, ""), conf,
+          new FetchedInputCallback() {
+            @Override
+            public void fetchComplete(FetchedInput fetchedInput) {}
+
+            @Override
+            public void fetchFailed(FetchedInput fetchedInput) {}
+
+            @Override
+            public void freeResources(FetchedInput fetchedInput) {}
+          });
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
index f921567..bc1e2b0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
@@ -1,6 +1,7 @@
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
@@ -79,11 +80,13 @@ public class TestShuffleInputEventHandler {
   public void setup() throws Exception {
     TezInputContext inputContext = createTezInputContext();
     scheduler = mock(ShuffleScheduler.class);
-    handler = new ShuffleInputEventHandler(inputContext, scheduler, false);
+    Configuration conf = mock(Configuration.class);
+    MergeManager merger = mock(MergeManager.class);
+    handler = new ShuffleInputEventHandler(inputContext, scheduler, merger,conf, false);
   }
 
   @Test
-  public void basicTest() {
+  public void basicTest() throws IOException {
     List<Event> events = new LinkedList<Event>();
     int srcIdx = 0;
     int targetIdx = 1;
@@ -100,7 +103,7 @@ public class TestShuffleInputEventHandler {
   }
 
   @Test
-  public void testFailedEvent() {
+  public void testFailedEvent() throws IOException {
     List<Event> events = new LinkedList<Event>();
     int targetIdx = 1;
     InputFailedEvent failedEvent = new InputFailedEvent(targetIdx, 0);

http://git-wip-us.apache.org/repos/asf/tez/blob/57d342cc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
index 2aa7396..1ca9aad 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
@@ -27,6 +27,7 @@ import java.util.BitSet;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
@@ -49,10 +50,11 @@ public class TestShuffleInputEventHandlerImpl {
   public void testSimple() throws IOException {
     TezInputContext inputContext = mock(TezInputContext.class);
     ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    Configuration conf = mock(Configuration.class);
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, conf);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, null);
@@ -71,10 +73,11 @@ public class TestShuffleInputEventHandlerImpl {
   public void testCurrentPartitionEmpty() throws IOException {
     TezInputContext inputContext = mock(TezInputContext.class);
     ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    Configuration conf = mock(Configuration.class);
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, conf);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(0));
@@ -92,10 +95,11 @@ public class TestShuffleInputEventHandlerImpl {
   public void testOtherPartitionEmpty() throws IOException {
     TezInputContext inputContext = mock(TezInputContext.class);
     ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    Configuration conf = mock(Configuration.class);
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, conf);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(1));
@@ -112,10 +116,11 @@ public class TestShuffleInputEventHandlerImpl {
   public void testMultipleEvents1() throws IOException {
     TezInputContext inputContext = mock(TezInputContext.class);
     ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    Configuration conf = mock(Configuration.class);
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, conf);
 
     int taskIndex1 = 1;
     Event dme1 = createDataMovementEvent(0, taskIndex1, createEmptyPartitionByteString(0));


Mime
View raw message