tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] tez git commit: TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads. (sseth)
Date Wed, 20 May 2015 17:49:02 GMT
TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of
blocking threads. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/70a465df
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/70a465df
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/70a465df

Branch: refs/heads/master
Commit: 70a465dfb73ca3ce97caf1c1427fe6324c1e073f
Parents: a9048bb
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed May 20 10:48:43 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed May 20 10:48:43 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tez-runtime-library/findbugs-exclude.xml        |   5 +
 .../library/common/shuffle/HttpConnection.java  |   8 +-
 .../common/shuffle/impl/ShuffleManager.java     |   2 +
 .../FetchedInputAllocatorOrderedGrouped.java    |  36 ++
 .../orderedgrouped/FetcherOrderedGrouped.java   | 163 ++++---
 .../shuffle/orderedgrouped/MapOutput.java       |  26 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  12 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  | 176 ++------
 .../orderedgrouped/ShuffleScheduler.java        | 426 +++++++++++++++----
 .../shuffle/orderedgrouped/TestFetcher.java     | 110 ++++-
 ...tShuffleInputEventHandlerOrderedGrouped.java |  51 ++-
 .../orderedgrouped/TestShuffleScheduler.java    | 279 ++++++++++++
 13 files changed, 911 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7781a9c..32118e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads.
   TEZ-2466. tez-history-parser breaks hadoop 2.2 compatability.
   TEZ-2463. Update site for 0.7.0 release
   TEZ-2461. tez-history-parser compile fails with hadoop-2.4.

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index aa1c7a2..489e243 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -104,5 +104,10 @@
     <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler"/>
+    <Method name="close" params="" returns="void"/>
+    <Bug pattern="NN_NAKED_NOTIFY"/>
+  </Match>
 
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index ad6ed19..7827f0a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -59,10 +59,10 @@ public class HttpConnection {
   private static SSLFactory sslFactory;
 
   @VisibleForTesting
-  protected HttpURLConnection connection;
-  private DataInputStream input;
+  protected volatile HttpURLConnection connection;
+  private volatile DataInputStream input;
 
-  private boolean connectionSucceeed;
+  private volatile boolean connectionSucceeed;
   private volatile boolean cleanup;
 
   private final JobTokenSecretManager jobTokenSecretMgr;
@@ -276,6 +276,7 @@ public class HttpConnection {
       if (input != null) {
         LOG.info("Closing input on " + logIdentifier);
         input.close();
+        input = null;
       }
       if (httpConnParams.keepAlive && connectionSucceeed) {
         // Refer:
@@ -287,6 +288,7 @@ public class HttpConnection {
           LOG.debug("Closing connection on " + logIdentifier);
         }
         connection.disconnect();
+        connection = null;
       }
     } catch (IOException e) {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index ac7caca..f354920 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -321,6 +321,7 @@ public class ShuffleManager implements FetcherCallback {
               } catch (InterruptedException e) {
                 if (isShutdown.get()) {
                   LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  Thread.currentThread().interrupt();
                   break;
                 } else {
                   throw e;
@@ -335,6 +336,7 @@ public class ShuffleManager implements FetcherCallback {
                 runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
                   LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  break;
                 }
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
new file mode 100644
index 0000000..ec1f8eb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.FileChunk;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public interface FetchedInputAllocatorOrderedGrouped {
+
+  // TODO TEZ-912 Consolidate this with FetchedInputAllocator.
+  public MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
+                                        long requestedSize,
+                                        long compressedLength,
+                                        int fetcherId) throws IOException;
+
+  void closeInMemoryFile(MapOutput mapOutput);
+
+  void closeOnDiskFile(FileChunk file);
+
+  void unreserve(long bytes);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 60f1c98..0248f13 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -19,25 +19,24 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.HttpURLConnection;
 import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.tez.common.CallableWithNdc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -50,81 +49,87 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
-class FetcherOrderedGrouped extends Thread {
+class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   
   private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class);
+
+  private static final AtomicInteger nextId = new AtomicInteger(0);
+
   private final Configuration conf;
   private final boolean localDiskFetchEnabled;
 
-  private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
-                                    CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
   private final TezCounter connectionErrs;
   private final TezCounter ioErrs;
   private final TezCounter wrongLengthErrs;
   private final TezCounter badIdErrs;
   private final TezCounter wrongMapErrs;
   private final TezCounter wrongReduceErrs;
-  private final MergeManager merger;
+  private final FetchedInputAllocatorOrderedGrouped allocator;
   private final ShuffleScheduler scheduler;
   private final ShuffleClientMetrics metrics;
   private final Shuffle shuffle;
   private final int id;
   private final String logIdentifier;
   private final String localShuffleHostPort;
-  private static int nextId = 0;
-  private int currentPartition = -1;
+  private final MapHost mapHost;
+
+  private final int currentPartition;
 
   // Decompression of map-outputs
   private final CompressionCodec codec;
   private final JobTokenSecretManager jobTokenSecretManager;
 
+  final HttpConnectionParams httpConnectionParams;
+
   @VisibleForTesting
   volatile boolean stopped = false;
-  
+
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   private LinkedList<InputAttemptIdentifier> remaining;
 
-  volatile HttpURLConnection connection;
   volatile DataInputStream input;
 
-  HttpConnection httpConnection;
-  HttpConnectionParams httpConnectionParams;
+  volatile HttpConnection httpConnection;
+
 
   // Initiative value is 0, which means it hasn't retried yet.
   private long retryStartTime = 0;
-  
+
   public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
-                               ShuffleScheduler scheduler, MergeManager merger,
+                               ShuffleScheduler scheduler,
+                               FetchedInputAllocatorOrderedGrouped allocator,
                                ShuffleClientMetrics metrics,
                                Shuffle shuffle, JobTokenSecretManager jobTokenSecretMgr,
                                boolean ifileReadAhead, int ifileReadAheadLength,
                                CompressionCodec codec,
-                               InputContext inputContext, Configuration conf,
+                               Configuration conf,
                                boolean localDiskFetchEnabled,
                                String localHostname,
-                               int shufflePort) throws IOException {
-    setDaemon(true);
+                               int shufflePort,
+                               String srcNameTrimmed,
+                               MapHost mapHost,
+                               TezCounter ioErrsCounter,
+                               TezCounter wrongLengthErrsCounter,
+                               TezCounter badIdErrsCounter,
+                               TezCounter wrongMapErrsCounter,
+                               TezCounter connectionErrsCounter,
+                               TezCounter wrongReduceErrsCounter) {
     this.scheduler = scheduler;
-    this.merger = merger;
+    this.allocator = allocator;
     this.metrics = metrics;
     this.shuffle = shuffle;
-    this.id = ++nextId;
+    this.mapHost = mapHost;
+    this.currentPartition = this.mapHost.getPartitionId();
+    this.id = nextId.incrementAndGet();
     this.jobTokenSecretManager = jobTokenSecretMgr;
-    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.IO_ERROR.toString());
-    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_LENGTH.toString());
-    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.BAD_ID.toString());
-    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_MAP.toString());
-    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.CONNECTION.toString());
-    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_REDUCE.toString());
+
+    this.ioErrs = ioErrsCounter;
+    this.wrongLengthErrs = wrongLengthErrsCounter;
+    this.badIdErrs = badIdErrsCounter;
+    this.wrongMapErrs = wrongMapErrsCounter;
+    this.connectionErrs = connectionErrsCounter;
+    this.wrongReduceErrs = wrongReduceErrsCounter;
 
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
@@ -139,73 +144,54 @@ class FetcherOrderedGrouped extends Thread {
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
 
-    this.logIdentifier = "fetcher [" + TezUtilsInternal
-        .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
-    setName(logIdentifier);
-    setDaemon(true);
-  }  
+    this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
+  }
 
   @VisibleForTesting
   protected void fetchNext() throws InterruptedException, IOException {
-    MapHost host = null;
     try {
-      // If merge is on, block
-      merger.waitForInMemoryMerge();
-
-      // In case usedMemory > memorylimit, wait until some memory is released
-      merger.waitForShuffleToMergeMemory();
-
-      // Get a host to shuffle from
-      host = scheduler.getHost();
       metrics.threadBusy();
 
-      String hostPort = host.getHostIdentifier();
+      String hostPort = mapHost.getHostIdentifier();
       if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
-        setupLocalDiskFetch(host);
+        setupLocalDiskFetch(mapHost);
       } else {
         // Shuffle
-        copyFromHost(host);
+        copyFromHost(mapHost);
       }
     } finally {
       cleanupCurrentConnection(false);
-      if (host != null) {
-        scheduler.freeHost(host);
-        metrics.threadFree();
-      }
+      scheduler.freeHost(mapHost);
+      metrics.threadFree();
     }
   }
 
-  public void run() {
+  @Override
+  public Void callInternal() {
     try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        remaining = null; // Safety.
-        fetchNext();
-      }
+      remaining = null; // Safety.
+      fetchNext();
     } catch (InterruptedException ie) {
       //TODO: might not be respected when fetcher is in progress / server is busy.  TEZ-711
       //Set the status back
       Thread.currentThread().interrupt();
-      return;
+      return null;
     } catch (Throwable t) {
       shuffle.reportException(t);
       // Shuffle knows how to deal with failures post shutdown via the onFailure hook
     }
+    return null;
   }
 
-  public void shutDown() throws InterruptedException {
-    this.stopped = true;
-    interrupt();
-    cleanupCurrentConnection(true);
-    try {
-      join(5000);
-    } catch (InterruptedException ie) {
-      //Reset the status
-      Thread.currentThread().interrupt();
-      LOG.warn("Got interrupt while joining " + getName());
+  public void shutDown() {
+    if (!stopped) {
+      stopped = true;
+      // An interrupt will come in while shutting down the thread.
+      cleanupCurrentConnection(false);
     }
   }
 
-  private Object cleanupLock = new Object();
+  private final Object cleanupLock = new Object();
   private void cleanupCurrentConnection(boolean disconnect) {
     // Synchronizing on cleanupLock to ensure we don't run into a parallel close
     // Can't synchronize on the main class itself since that would cause the
@@ -214,6 +200,7 @@ class FetcherOrderedGrouped extends Thread {
       try {
         if (httpConnection != null) {
           httpConnection.cleanup(disconnect);
+          httpConnection = null;
         }
       } catch (IOException e) {
         if (LOG.isDebugEnabled()) {
@@ -237,8 +224,7 @@ class FetcherOrderedGrouped extends Thread {
     retryStartTime = 0;
     // Get completed maps on 'host'
     List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    currentPartition = host.getPartitionId();
-    
+
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
     if (srcAttempts.size() == 0) {
@@ -254,18 +240,16 @@ class FetcherOrderedGrouped extends Thread {
     remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
-    if (!setupConnection(host, srcAttempts)) {
-      if (stopped) {
-        cleanupCurrentConnection(true);
-      }
-      // Add back all remaining maps - which at this point is ALL MAPS the
-      // Fetcher was started with. The Scheduler takes care of retries,
-      // reporting too many failures etc.
-      putBackRemainingMapOutputs(host);
-      return;
-    }
 
     try {
+      if (!setupConnection(host, srcAttempts)) {
+        if (stopped) {
+          cleanupCurrentConnection(true);
+        }
+        // Maps will be added back in the finally block in case of failure.
+        return;
+      }
+
       // Loop through available map-outputs and fetch them
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 
@@ -453,7 +437,7 @@ class FetcherOrderedGrouped extends Thread {
 
       // Get the location for the map output - either in-memory or on-disk
       try {
-        mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
+        mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id);
       } catch (IOException e) {
         if (!stopped) {
           // Kill the reduce attempt
@@ -493,7 +477,7 @@ class FetcherOrderedGrouped extends Thread {
       // Reset retryStartTime as map task make progress if retried before.
       retryStartTime = 0;
 
-      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, 
+      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
                               endTime - startTime, mapOutput);
       // Note successful shuffle
       remaining.remove(srcAttemptId);
@@ -584,7 +568,7 @@ class FetcherOrderedGrouped extends Thread {
       int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
     if (compressedLength < 0 || decompressedLength < 0) {
       wrongLengthErrs.increment(1);
-      LOG.warn(getName() + " invalid lengths in map output header: id: " +
+      LOG.warn(logIdentifier + " invalid lengths in map output header: id: " +
           srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
                decompressedLength);
       return false;
@@ -594,7 +578,7 @@ class FetcherOrderedGrouped extends Thread {
     // URI
     if (forReduce != currentPartition) {
       wrongReduceErrs.increment(1);
-      LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
+      LOG.warn(logIdentifier + " data for the wrong partition map: " + srcAttemptId + " len: "
           + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
           + ", expected partition: " + currentPartition);
       return false;
@@ -622,7 +606,6 @@ class FetcherOrderedGrouped extends Thread {
   protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
     // Get completed maps on 'host'
     List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    currentPartition = host.getPartitionId();
 
     // Sanity check to catch hosts with only 'OBSOLETE' maps,
     // especially at the tail of large jobs
@@ -708,7 +691,7 @@ class FetcherOrderedGrouped extends Thread {
   protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
                                                      Path filename, TezIndexRecord indexRecord)
       throws IOException {
-    return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+    return MapOutput.createLocalDiskMapOutput(srcAttemptId, allocator, filename,
         indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index 55c80aa..29bf799 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -50,7 +50,7 @@ class MapOutput {
   private final long size;
 
   private final boolean primaryMapOutput;
-  private final MergeManager merger;
+  private final FetchedInputAllocatorOrderedGrouped callback;
 
   // MEMORY
   private final byte[] memory;
@@ -62,13 +62,13 @@ class MapOutput {
   private final FileChunk outputPath;
   private OutputStream disk;
 
-  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, MergeManager merger,
+  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
                     long size, Path outputPath, long offset, boolean primaryMapOutput,
                     FileSystem fs, Path tmpOutputPath) {
     this.id = ID.incrementAndGet();
     this.type = type;
     this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
+    this.callback = callback;
     this.primaryMapOutput = primaryMapOutput;
 
     this.localFS = fs;
@@ -101,7 +101,7 @@ class MapOutput {
   }
 
   public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                              MergeManager merger, long size, Configuration conf,
+                                              FetchedInputAllocatorOrderedGrouped callback, long size, Configuration conf,
                                               int fetcher, boolean primaryMapOutput,
                                               TezTaskOutputFiles mapOutputFile) throws
       IOException {
@@ -113,7 +113,7 @@ class MapOutput {
     Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
     long offset = 0;
 
-    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset,
+    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputpath, offset,
         primaryMapOutput, fs, tmpOuputPath);
     mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
 
@@ -121,16 +121,16 @@ class MapOutput {
   }
 
   public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                                   MergeManager merger, Path path,  long offset,
+                                                   FetchedInputAllocatorOrderedGrouped callback, Path path,  long offset,
                                                    long size, boolean primaryMapOutput)  {
-    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, merger, size, path, offset,
+    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, callback, size, path, offset,
         primaryMapOutput, null, null);
   }
 
   public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                                MergeManager merger, int size,
+                                                FetchedInputAllocatorOrderedGrouped callback, int size,
                                                 boolean primaryMapOutput)  {
-    return new MapOutput(Type.MEMORY, attemptIdentifier, merger, size, null, -1, primaryMapOutput,
+    return new MapOutput(Type.MEMORY, attemptIdentifier, callback, size, null, -1, primaryMapOutput,
         null, null);
   }
 
@@ -185,12 +185,12 @@ class MapOutput {
 
   public void commit() throws IOException {
     if (type == Type.MEMORY) {
-      merger.closeInMemoryFile(this);
+      callback.closeInMemoryFile(this);
     } else if (type == Type.DISK) {
       localFS.rename(tmpOutputPath, outputPath.getPath());
-      merger.closeOnDiskFile(outputPath);
+      callback.closeOnDiskFile(outputPath);
     } else if (type == Type.DISK_DIRECT) {
-      merger.closeOnDiskFile(outputPath);
+      callback.closeOnDiskFile(outputPath);
     } else {
       throw new IOException("Cannot commit MapOutput of type WAIT!");
     }
@@ -198,7 +198,7 @@ class MapOutput {
   
   public void abort() {
     if (type == Type.MEMORY) {
-      merger.unreserve(memory.length);
+      callback.unreserve(memory.length);
     } else if (type == Type.DISK) {
       try {
         localFS.delete(tmpOutputPath, false);

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 5a35f2f..0536bc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -74,7 +74,7 @@ import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @SuppressWarnings(value={"rawtypes"})
-public class MergeManager {
+public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   
   private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
 
@@ -373,6 +373,7 @@ public class MergeManager {
 
   final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
 
+  @Override
   public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
                                              long requestedSize,
                                              long compressedLength,
@@ -429,8 +430,9 @@ public class MergeManager {
     return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize,
         primaryMapOutput);
   }
-  
-  synchronized void unreserve(long size) {
+
+  @Override
+  public synchronized void unreserve(long size) {
     commitMemory -= size;
     usedMemory -= size;
     if (LOG.isDebugEnabled()) {
@@ -440,6 +442,7 @@ public class MergeManager {
     notifyAll();
   }
 
+  @Override
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
     inMemoryMapOutputs.add(mapOutput);
     LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
@@ -483,7 +486,8 @@ public class MergeManager {
              ", inMemoryMergedMapOutputs.size() -> " + 
              inMemoryMergedMapOutputs.size());
   }
-  
+
+  @Override
   public synchronized void closeOnDiskFile(FileChunk file) {
     //including only path & offset for valdiations.
     for (FileChunk fileChunk : onDiskMapOutputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index cb12a63..20e7f5b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -27,7 +26,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.crypto.SecretKey;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,15 +37,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
@@ -57,11 +52,8 @@ import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -83,22 +75,14 @@ public class Shuffle implements ExceptionReporter {
   private final Configuration conf;
   private final InputContext inputContext;
   
-  private final ShuffleClientMetrics metrics;
-
   private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
   private final ShuffleScheduler scheduler;
   private final MergeManager merger;
 
-  private final SecretKey jobTokenSecret;
-  private final JobTokenSecretManager jobTokenSecretMgr;
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
-  private final int numFetchers;
-  private final boolean localDiskFetchEnabled;
-  private final String localHostname;
-  private final int shufflePort;
-  
+
   private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
   private String throwingThreadName = null;
 
@@ -108,9 +92,6 @@ public class Shuffle implements ExceptionReporter {
   
   private final String srcNameTrimmed;
   
-  private final List<FetcherOrderedGrouped> fetchers;
-  private final HttpConnectionParams httpConnectionParams;
-  
   private AtomicBoolean isShutDown = new AtomicBoolean(false);
   private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
   private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
@@ -124,19 +105,10 @@ public class Shuffle implements ExceptionReporter {
       long initialMemoryAvailable) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
-    this.httpConnectionParams =
-        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
-    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
-        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
-        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-    
+
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
     
-    this.jobTokenSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-    this.jobTokenSecretMgr = new JobTokenSecretManager(jobTokenSecret);
-    
+
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
           ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -161,33 +133,14 @@ public class Shuffle implements ExceptionReporter {
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
 
-    this.localHostname = inputContext.getExecutionContext().getHostName();
-    final ByteBuffer shuffleMetadata =
-        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
-    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
-
     // TODO TEZ Get rid of Map / Reduce references.
-    TezCounter shuffledInputsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
-    TezCounter reduceShuffleBytes =
-        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
-    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
-    TezCounter failedShuffleCounter =
-        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
-    TezCounter spilledRecordsCounter = 
+    TezCounter spilledRecordsCounter =
         inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     TezCounter reduceCombineInputCounter =
         inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     TezCounter mergedMapOutputsCounter =
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
-    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_DISK);
-    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_MEM);
-    
+
     LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
         + (codec == null ? "None" : codec.getClass().getName()) + 
         "ifileReadAhead: " + ifileReadAhead);
@@ -195,36 +148,38 @@ public class Shuffle implements ExceptionReporter {
     boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
       TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     startTime = System.currentTimeMillis();
+    merger = new MergeManager(
+        this.conf,
+        localFS,
+        localDirAllocator,
+        inputContext,
+        combiner,
+        spilledRecordsCounter,
+        reduceCombineInputCounter,
+        mergedMapOutputsCounter,
+        this,
+        initialMemoryAvailable,
+        codec,
+        ifileReadAhead,
+        ifileReadAheadLength);
+
     scheduler = new ShuffleScheduler(
           this.inputContext,
           this.conf,
           numInputs,
           this,
-          shuffledInputsCounter,
-          reduceShuffleBytes,
-          reduceDataSizeDecompressed,
-          failedShuffleCounter,
-          bytesShuffedToDisk,
-          bytesShuffedToDiskDirect,
-          bytesShuffedToMem,
-          startTime);
+          merger,
+          merger,
+          startTime,
+          codec,
+          ifileReadAhead,
+          ifileReadAheadLength,
+          srcNameTrimmed);
+
     this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
     this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
 
-    merger = new MergeManager(
-          this.conf,
-          localFS,
-          localDirAllocator,
-          inputContext,
-          combiner,
-          spilledRecordsCounter,
-          reduceCombineInputCounter,
-          mergedMapOutputsCounter,
-          this,
-          initialMemoryAvailable,
-          codec,
-          ifileReadAhead,
-          ifileReadAheadLength);
+
 
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
@@ -234,16 +189,6 @@ public class Shuffle implements ExceptionReporter {
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
 
-    int configuredNumFetchers = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-    numFetchers = Math.min(configuredNumFetchers, numInputs);
-    LOG.info("Num fetchers being started: " + numFetchers);
-    fetchers = Lists.newArrayListWithCapacity(numFetchers);
-    localDiskFetchEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
 
     executor = MoreExecutors.listeningDecorator(rawExecutor);
     runShuffleCallable = new RunShuffleCallable();
@@ -338,36 +283,16 @@ public class Shuffle implements ExceptionReporter {
     @Override
     protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException {
 
-      synchronized (this) {
-        synchronized (fetchers) {
-          for (int i = 0; i < numFetchers; ++i) {
-            if (!isShutDown.get()) {
-              FetcherOrderedGrouped
-                fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
-                metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
-                codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort);
-              fetchers.add(fetcher);
-              fetcher.start();
-            }
-          }
+      if (!isShutDown.get()) {
+        try {
+          scheduler.start();
+        } catch (Throwable e) {
+          throw new ShuffleError("Error during shuffle", e);
         }
       }
 
-      
-      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-        synchronized (Shuffle.this) {
-          if (throwable.get() != null) {
-            throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                   throwable.get());
-          }
-        }
-      }
       shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
 
-      // Stop the map-output fetcher threads
-      LOG.info("Cleaning up fetchers");
-      cleanupFetchers(false);
-      
       // stop the scheduler
       cleanupShuffleScheduler(false);
 
@@ -395,38 +320,6 @@ public class Shuffle implements ExceptionReporter {
       return kvIter;
     }
   }
-  
-  private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
-    // Stop the fetcher threads
-    InterruptedException ie = null;
-    if (!fetchersClosed.getAndSet(true)) {
-      synchronized (fetchers) {
-        for (FetcherOrderedGrouped fetcher : fetchers) {
-          try {
-            fetcher.shutDown();
-            LOG.info("Shutdown.." + fetcher.getName());
-          } catch (InterruptedException e) {
-            if (ignoreErrors) {
-              LOG.info("Interrupted while shutting down fetchers. Ignoring.");
-            } else {
-              if (ie != null) {
-                ie = e;
-              } else {
-                LOG.warn(
-                    "Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
-                        + e);
-              }
-            }
-          }
-        }
-        fetchers.clear();
-      }
-      // throw only the first exception while attempting to shutdown.
-      if (ie != null) {
-        throw ie;
-      }
-    }
-  }
 
   private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
 
@@ -469,7 +362,6 @@ public class Shuffle implements ExceptionReporter {
 
   private void cleanupIgnoreErrors() {
     try {
-      cleanupFetchers(true);
       cleanupShuffleScheduler(true);
       cleanupMerger(true);
     } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index c54b005..85c3a30 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -17,10 +17,13 @@
  */
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
+import javax.crypto.SecretKey;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,13 +35,29 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -59,32 +78,42 @@ import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Ty
 import com.google.common.collect.Lists;
 
 class ShuffleScheduler {
-  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
-    protected Long initialValue() {
-      return 0L;
-    }
-  };
+
+  @VisibleForTesting
+  enum ShuffleErrors {
+    IO_ERROR,
+    WRONG_LENGTH,
+    BAD_ID,
+    WRONG_MAP,
+    CONNECTION,
+    WRONG_REDUCE
+  }
+  @VisibleForTesting
+  final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  private final AtomicLong shuffleStart = new AtomicLong(0);
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
   private static final long INITIAL_PENALTY = 2000l; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
 
-  private boolean[] finishedMaps;
+  private final BitSet finishedMaps;
   private final int numInputs;
-  private int remainingMaps;
   private int numFetchedSpills;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  @VisibleForTesting
+  final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
   //TODO Clean this and other maps at some point
-  private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+  private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
 
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
   // enabled in source.
   @VisibleForTesting
   final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
 
-  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
-  
+  private final Set<MapHost> pendingHosts = new HashSet<MapHost>();
+  private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
+
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
   private final Referee referee;
@@ -93,7 +122,6 @@ class ShuffleScheduler {
   private final Map<String,IntWritable> hostFailures = 
     new HashMap<String,IntWritable>();
   private final InputContext inputContext;
-  private final Shuffle shuffle;
   private final TezCounter shuffledInputsCounter;
   private final TezCounter skippedInputCounter;
   private final TezCounter reduceShuffleBytes;
@@ -105,13 +133,42 @@ class ShuffleScheduler {
   private final TezCounter firstEventReceived;
   private final TezCounter lastEventReceived;
 
+  private final String srcNameTrimmed;
+  private final AtomicInteger remainingMaps;
   private final long startTime;
   private long lastProgressTime;
 
-  private int maxTaskOutputAtOnce;
-  private int maxFetchFailuresBeforeReporting;
-  private boolean reportReadErrorImmediately = true; 
-  private int maxFailedUniqueFetches = 5;
+  private final int numFetchers;
+  private final Set<FetcherOrderedGrouped> runningFetchers =
+      Collections.newSetFromMap(new ConcurrentHashMap<FetcherOrderedGrouped, Boolean>());
+
+  private final ListeningExecutorService fetcherExecutor;
+
+  private final HttpConnection.HttpConnectionParams httpConnectionParams;
+  private final FetchedInputAllocatorOrderedGrouped allocator;
+  private final ShuffleClientMetrics shuffleMetrics;
+  private final Shuffle shuffle;
+  private final MergeManager mergeManager;
+  private final JobTokenSecretManager jobTokenSecretManager;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final CompressionCodec codec;
+  private final Configuration conf;
+  private final boolean localDiskFetchEnabled;
+  private final String localHostname;
+  private final int shufflePort;
+
+  private final TezCounter ioErrsCounter;
+  private final TezCounter wrongLengthErrsCounter;
+  private final TezCounter badIdErrsCounter;
+  private final TezCounter wrongMapErrsCounter;
+  private final TezCounter connectionErrsCounter;
+  private final TezCounter wrongReduceErrsCounter;
+
+  private final int maxTaskOutputAtOnce;
+  private final int maxFetchFailuresBeforeReporting;
+  private final boolean reportReadErrorImmediately;
+  private final int maxFailedUniqueFetches;
   private final int abortFailureLimit;
   private int maxMapRuntime = 0;
 
@@ -122,32 +179,88 @@ class ShuffleScheduler {
                           Configuration conf,
                           int numberOfInputs,
                           Shuffle shuffle,
-                          TezCounter shuffledInputsCounter,
-                          TezCounter reduceShuffleBytes,
-                          TezCounter reduceBytesDecompressed,
-                          TezCounter failedShuffleCounter,
-                          TezCounter bytesShuffledToDisk,
-                          TezCounter bytesShuffledToDiskDirect,
-                          TezCounter bytesShuffledToMem, long startTime) {
+                          MergeManager mergeManager,
+                          FetchedInputAllocatorOrderedGrouped allocator,
+                          long startTime,
+                          CompressionCodec codec,
+                          boolean ifileReadAhead,
+                          int ifileReadAheadLength,
+                          String srcNameTrimmed) throws IOException {
     this.inputContext = inputContext;
+    this.conf = conf;
+    this.shuffle = shuffle;
+    this.allocator = allocator;
+    this.mergeManager = mergeManager;
     this.numInputs = numberOfInputs;
     abortFailureLimit = Math.max(30, numberOfInputs / 10);
-    remainingMaps = numberOfInputs;
-    finishedMaps = new boolean[remainingMaps]; // default init to false
+    remainingMaps = new AtomicInteger(numberOfInputs);
+    finishedMaps = new BitSet(numberOfInputs);
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.srcNameTrimmed = srcNameTrimmed;
+    this.codec = codec;
+    int configuredNumFetchers =
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+    numFetchers = Math.min(configuredNumFetchers, numInputs);
+    LOG.info("Num fetchers determined to be: " + numFetchers);
+
+    localDiskFetchEnabled = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+    this.localHostname = inputContext.getExecutionContext().getHostName();
+    final ByteBuffer shuffleMetadata =
+        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
     this.referee = new Referee();
-    this.shuffle = shuffle;
-    this.shuffledInputsCounter = shuffledInputsCounter;
-    this.reduceShuffleBytes = reduceShuffleBytes;
-    this.reduceBytesDecompressed = reduceBytesDecompressed;
-    this.failedShuffleCounter = failedShuffleCounter;
-    this.bytesShuffledToDisk = bytesShuffledToDisk;
-    this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
-    this.bytesShuffledToMem = bytesShuffledToMem;
+    // Counters used by the ShuffleScheduler
+    this.shuffledInputsCounter = inputContext.getCounters().findCounter(
+        TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.reduceShuffleBytes = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    this.reduceBytesDecompressed = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    this.failedShuffleCounter = inputContext.getCounters().findCounter(
+        TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    this.bytesShuffledToDisk = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    this.bytesShuffledToDiskDirect =  inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
+    this.bytesShuffledToMem = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+
+    // Counters used by Fetchers
+    ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.IO_ERROR.toString());
+    wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_LENGTH.toString());
+    badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.BAD_ID.toString());
+    wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_MAP.toString());
+    connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.CONNECTION.toString());
+    wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_REDUCE.toString());
+
     this.startTime = startTime;
     this.lastProgressTime = startTime;
 
-    this.maxFailedUniqueFetches = Math.min(numberOfInputs,
-        this.maxFailedUniqueFetches);
+    this.httpConnectionParams =
+        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+    this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+    SecretKey jobTokenSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
+
+    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+
+    this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
     referee.start();
     this.maxFetchFailuresBeforeReporting = 
         conf.getInt(
@@ -175,6 +288,30 @@ class ShuffleScheduler {
         + ", maxMapRuntime=" + maxMapRuntime);
   }
 
+  public void start() throws Exception {
+    ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
+    schedulerCallable.call();
+  }
+
+  public void close() throws InterruptedException {
+    if (!isShutdown.getAndSet(true)) {
+
+      // Interrupt the waiting Scheduler thread.
+      synchronized (this) {
+        notifyAll();
+      }
+
+      // Interrupt the fetchers.
+      for (FetcherOrderedGrouped fetcher : runningFetchers) {
+        fetcher.shutDown();
+      }
+
+      // Kill the Referee thread.
+      referee.interrupt();
+      referee.join();
+    }
+  }
+
   protected synchronized  void updateEventReceivedTime() {
     long relativeTime = System.currentTimeMillis() - startTime;
     if (firstEventReceived.getValue() == 0) {
@@ -264,7 +401,7 @@ class ShuffleScheduler {
        * we retrieve all spill details to claim success.
        */
       if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
-        remainingMaps = remainingMaps - 1;
+        remainingMaps.decrementAndGet();
         setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
         numFetchedSpills++;
       } else {
@@ -292,7 +429,7 @@ class ShuffleScheduler {
 
         //check if we downloaded all spills pertaining to this InputAttemptIdentifier
         if (eventInfo.isDone()) {
-          remainingMaps = remainingMaps - 1;
+          remainingMaps.decrementAndGet();
           setInputFinished(inputIdentifier.getInputIndex());
           shuffleInfoEventsMap.remove(inputIdentifier);
           if (LOG.isTraceEnabled()) {
@@ -306,9 +443,9 @@ class ShuffleScheduler {
         }
       }
 
-      if (remainingMaps == 0) {
+      if (remainingMaps.get() == 0) {
+        notifyAll(); // Notify the getHost() method.
         LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
-        notifyAll();
       }
 
       // update the status
@@ -365,11 +502,11 @@ class ShuffleScheduler {
 
   private void logProgress() {
     double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
-    int inputsDone = numInputs - remainingMaps;
+    int inputsDone = numInputs - remainingMaps.get();
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
     double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills +  ") of " + numInputs +
+    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs +
         ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
@@ -422,7 +559,7 @@ class ShuffleScheduler {
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
     
-    penalties.add(new Penalty(host, delay));    
+    penalties.add(new Penalty(host, delay));
   }
 
   public void reportLocalError(IOException ioe) {
@@ -461,7 +598,7 @@ class ShuffleScheduler {
     final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
 
     long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = numInputs - remainingMaps;
+    int doneMaps = numInputs - remainingMaps.get();
     
     boolean reducerHealthy =
       (((float)totalFailures / (totalFailures + doneMaps))
@@ -559,28 +696,33 @@ class ShuffleScheduler {
   }
 
   public synchronized MapHost getHost() throws InterruptedException {
-      while(pendingHosts.isEmpty()) {
-        wait();
-      }
-      
+    while (pendingHosts.isEmpty() && remainingMaps.get() > 0) {
+      LOG.info("PendingHosts=" + pendingHosts);
+      wait();
+    }
+
+    if (!pendingHosts.isEmpty()) {
+
       MapHost host = null;
       Iterator<MapHost> iter = pendingHosts.iterator();
       int numToPick = random.nextInt(pendingHosts.size());
-      for (int i=0; i <= numToPick; ++i) {
+      for (int i = 0; i <= numToPick; ++i) {
         host = iter.next();
       }
-      
-      pendingHosts.remove(host);     
+
+      pendingHosts.remove(host);
       host.markBusy();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
             " to " + Thread.currentThread().getName());
       }
       shuffleStart.set(System.currentTimeMillis());
-      
       return host;
+    } else {
+      return null;
+    }
   }
-  
+
   public InputAttemptIdentifier getIdentifierForFetchedOutput(
       String path, int reduceId) {
     return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
@@ -680,8 +822,8 @@ class ShuffleScheduler {
         notifyAll();
       }
     }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
+        (System.currentTimeMillis() - shuffleStart.get()) + "ms");
   }
 
   public synchronized void resetKnownMaps() {
@@ -693,28 +835,13 @@ class ShuffleScheduler {
 
   /**
    * Utility method to check if the Shuffle data fetch is complete.
-   * @return
+   * @return true if complete
    */
   public synchronized boolean isDone() {
-    return remainingMaps == 0;
+    return remainingMaps.get() == 0;
   }
 
   /**
-   * Wait until the shuffle finishes or until the timeout.
-   * @param millis maximum wait time
-   * @return true if the shuffle is done
-   * @throws InterruptedException
-   */
-  public synchronized boolean waitUntilDone(int millis
-                                            ) throws InterruptedException {
-    if (remainingMaps > 0) {
-      wait(millis);
-      return remainingMaps == 0;
-    }
-    return true;
-  }
-  
-  /**
    * A structure that records the penalty for a host.
    */
   private static class Penalty implements Delayed {
@@ -754,7 +881,7 @@ class ShuffleScheduler {
 
     public void run() {
       try {
-        while (true) {
+        while (!isShutdown.get()) {
           // take the first host that has an expired penalty
           MapHost host = penalties.take().host;
           synchronized (ShuffleScheduler.this) {
@@ -767,7 +894,6 @@ class ShuffleScheduler {
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
         // This handles shutdown of the entire fetch / merge process.
-        return;
       } catch (Throwable t) {
         // Shuffle knows how to deal with failures post shutdown via the onFailure hook
         shuffle.reportException(t);
@@ -775,11 +901,6 @@ class ShuffleScheduler {
     }
   }
   
-  public void close() throws InterruptedException {
-    referee.interrupt();
-    referee.join();
-  }
-
   public synchronized void informMaxMapRunTime(int duration) {
     if (duration > maxMapRuntime) {
       maxMapRuntime = duration;
@@ -788,13 +909,154 @@ class ShuffleScheduler {
   
   void setInputFinished(int inputIndex) {
     synchronized(finishedMaps) {
-      finishedMaps[inputIndex] = true;
+      finishedMaps.set(inputIndex, true);
     }
   }
   
   boolean isInputFinished(int inputIndex) {
     synchronized (finishedMaps) {
-      return finishedMaps[inputIndex];      
+      return finishedMaps.get(inputIndex);
+    }
+  }
+
+  private class ShuffleSchedulerCallable extends CallableWithNdc<Void> {
+
+
+    @Override
+    protected Void callInternal() throws InterruptedException {
+      outer:
+      while (!isShutdown.get() && remainingMaps.get() > 0) {
+        synchronized (ShuffleScheduler.this) {
+          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
+            if (remainingMaps.get() > 0) {
+              try {
+                ShuffleScheduler.this.wait();
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info(
+                      "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+                  Thread.currentThread().interrupt();
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+            }
+          }
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NumCompletedInputs: {}" + (numInputs - remainingMaps.get()));
+        }
+
+        // Ensure there's memory available before scheduling the next Fetcher.
+        try {
+          // If merge is on, block
+          mergeManager.waitForInMemoryMerge();
+          // In case usedMemory > memorylimit, wait until some memory is released
+          mergeManager.waitForShuffleToMergeMemory();
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info(
+                "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+            Thread.currentThread().interrupt();
+            break;
+          } else {
+            throw e;
+          }
+        }
+
+        if (!isShutdown.get() && remainingMaps.get() > 0) {
+          synchronized (ShuffleScheduler.this) {
+            int numFetchersToRun = numFetchers - runningFetchers.size();
+            int count = 0;
+            while (count < numFetchersToRun && !isShutdown.get() && remainingMaps.get() > 0) {
+              MapHost mapHost;
+              try {
+                mapHost = getHost();  // Leads to a wait.
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info(
+                      "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+                  Thread.currentThread().interrupt();
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+              if (mapHost == null) {
+                break; // Check for the exit condition.
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing pending host: " + mapHost.toString());
+              }
+              if (!isShutdown.get()) {
+                count++;
+                LOG.info("Scheduling fetch for inputHost: {}", mapHost.getIdentifier());
+                FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
+                runningFetchers.add(fetcherOrderedGrouped);
+                ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped);
+                Futures.addCallback(future, new FetchFutureCallback(fetcherOrderedGrouped));
+              }
+            }
+          }
+        }
+      }
+      LOG.info("Shutting down FetchScheduler for input: {}, wasInterrupted={}", srcNameTrimmed, Thread.currentThread().isInterrupted());
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+
+  @VisibleForTesting
+  FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
+    return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator,
+        shuffleMetrics, shuffle, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
+        codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter);
+  }
+
+  private class FetchFutureCallback implements FutureCallback<Void> {
+
+    private final FetcherOrderedGrouped fetcherOrderedGrouped;
+
+    public FetchFutureCallback(
+        FetcherOrderedGrouped fetcherOrderedGrouped) {
+      this.fetcherOrderedGrouped = fetcherOrderedGrouped;
+    }
+
+    private void doBookKeepingForFetcherComplete() {
+      synchronized (ShuffleScheduler.this) {
+        runningFetchers.remove(fetcherOrderedGrouped);
+        ShuffleScheduler.this.notifyAll();
+      }
+    }
+
+
+
+    @Override
+    public void onSuccess(Void result) {
+      fetcherOrderedGrouped.shutDown();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring fetch complete");
+      } else {
+        doBookKeepingForFetcherComplete();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      fetcherOrderedGrouped.shutDown();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring fetch complete");
+      } else {
+        LOG.error("Fetcher failed with error", t);
+        shuffle.reportException(t);
+        doBookKeepingForFetcherComplete();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index c33905f..f77e9a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -40,11 +40,12 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.io.DataOutputBuffer;
+import com.google.common.collect.Lists;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -73,8 +74,59 @@ public class TestFetcher {
   public static final String HOST = "localhost";
   public static final int PORT = 65;
 
+  private TezCounters tezCounters = new TezCounters();
+  private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+      ShuffleScheduler.ShuffleErrors.IO_ERROR.toString());
+  private TezCounter wrongLengthErrsCounter =
+      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+          ShuffleScheduler.ShuffleErrors.WRONG_LENGTH.toString());
+  private TezCounter badIdErrsCounter =
+      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+          ShuffleScheduler.ShuffleErrors.BAD_ID.toString());
+  private TezCounter wrongMapErrsCounter =
+      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+          ShuffleScheduler.ShuffleErrors.WRONG_MAP.toString());
+  private TezCounter connectionErrsCounter =
+      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+          ShuffleScheduler.ShuffleErrors.CONNECTION.toString());
+  private TezCounter wrongReduceErrsCounter =
+      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+          ShuffleScheduler.ShuffleErrors.WRONG_REDUCE.toString());
+
   static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
 
+  @Test (timeout = 5000)
+  public void testInputsReturnedOnConnectionException() throws Exception {
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    Shuffle shuffle = mock(Shuffle.class);
+
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn("src vertex").when(inputContext).getSourceVertexName();
+
+    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt");
+    mapHost.addKnownMap(inputAttemptIdentifier);
+    List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
+    doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
+
+    FetcherOrderedGrouped fetcher =
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+            null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter,
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+
+    fetcher.call();
+    verify(scheduler).getMapsForHost(mapHost);
+    verify(scheduler).freeHost(mapHost);
+    verify(scheduler).putBackKnownMapOutput(mapHost, inputAttemptIdentifier);
+  }
+
+
   @Test(timeout = 5000)
   public void testLocalFetchModeSetting1() throws Exception {
     Configuration conf = new TezConfiguration();
@@ -90,14 +142,15 @@ public class TestFetcher {
     final boolean ENABLE_LOCAL_FETCH = true;
     final boolean DISABLE_LOCAL_FETCH = false;
     MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
-    FetcherOrderedGrouped
-        fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
-        false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+    FetcherOrderedGrouped fetcher =
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter,
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
 
     // when local mode is enabled and host and port matches use local fetch
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
-    doReturn(mapHost).when(scheduler).getHost();
 
     spyFetcher.fetchNext();
 
@@ -105,10 +158,14 @@ public class TestFetcher {
     verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
 
     // if hostname does not match use http
-    spyFetcher = spy(fetcher);
     mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+    fetcher =
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter,
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+    spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
-    doReturn(mapHost).when(scheduler).getHost();
 
     spyFetcher.fetchNext();
 
@@ -116,10 +173,14 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     // if port does not match use http
-    spyFetcher = spy(fetcher);
     mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+    fetcher =
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter,
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+    spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
-    doReturn(mapHost).when(scheduler).getHost();
 
     spyFetcher.fetchNext();
 
@@ -128,11 +189,12 @@ public class TestFetcher {
 
     //if local fetch is not enabled
     mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
-    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
-        false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+        wrongLengthErrsCounter, badIdErrsCounter,
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
-    doReturn(mapHost).when(scheduler).getHost();
 
     spyFetcher.fetchNext();
 
@@ -151,13 +213,14 @@ public class TestFetcher {
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
 
-    FetcherOrderedGrouped
-        fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
-        false, 0, null, inputContext, conf, true, HOST, PORT);
-    FetcherOrderedGrouped spyFetcher = spy(fetcher);
-
     MapHost host = new MapHost(1, HOST + ":" + PORT,
         "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+    FetcherOrderedGrouped spyFetcher = spy(fetcher);
+
+
     List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
         new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -294,13 +357,14 @@ public class TestFetcher {
 
     HttpConnection.HttpConnectionParams httpConnectionParams =
         ShuffleUtils.constructHttpShuffleConnectionParams(conf);
-    FetcherOrderedGrouped mockFetcher =
-        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null,
-            false, 0, null, inputContext, conf, false, HOST, PORT);
-    final FetcherOrderedGrouped fetcher = spy(mockFetcher);
-
     final MapHost host = new MapHost(1, HOST + ":" + PORT,
         "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+    final FetcherOrderedGrouped fetcher = spy(mockFetcher);
+
+
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
         new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),

http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index eed9fd8..78d214c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -1,18 +1,21 @@
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -20,6 +23,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -27,7 +31,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -62,12 +66,20 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
   private ShuffleScheduler realScheduler;
   private MergeManager mergeManager;
 
-  private InputContext createTezInputContext() {
+  private InputContext createTezInputContext() throws IOException {
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);
     InputContext inputContext = mock(InputContext.class);
     doReturn(applicationId).when(inputContext).getApplicationId();
     doReturn("sourceVertex").when(inputContext).getSourceVertexName();
     when(inputContext.getCounters()).thenReturn(new TezCounters());
+    ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+    doReturn(executionContext).when(inputContext).getExecutionContext();
+    ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
+    doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
+    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
+        new JobTokenSecretManager());
+    ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
+    doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
     return inputContext;
   }
 
@@ -129,33 +141,18 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
   private void setupScheduler(int numInputs) throws Exception {
     InputContext inputContext = createTezInputContext();
     Configuration config = new Configuration();
-    TezCounter shuffledInputsCounter =
-        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
-    TezCounter reduceShuffleBytes =
-        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
-    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
-    TezCounter failedShuffleCounter =
-        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
-    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_DISK);
-    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_MEM);
     realScheduler = new ShuffleScheduler(
         inputContext,
         config,
         numInputs,
         mock(Shuffle.class),
-        shuffledInputsCounter,
-        reduceShuffleBytes,
-        reduceDataSizeDecompressed,
-        failedShuffleCounter,
-        bytesShuffedToDisk,
-        bytesShuffedToDiskDirect,
-        bytesShuffedToMem,
-        System.currentTimeMillis());
+        mock(MergeManager.class),
+        mock(MergeManager.class),
+        System.currentTimeMillis(),
+        null,
+        false,
+        0,
+        "src vertex");
     scheduler = spy(realScheduler);
     handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
     mergeManager = mock(MergeManager.class);


Mime
View raw message