tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1590. Fetchers should not report failures after the Processor on the task completes. (sseth)
Date Fri, 24 Oct 2014 01:24:54 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2266cccf9 -> b8b317237


TEZ-1590. Fetchers should not report failures after the Processor on the
task completes. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b8b31723
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b8b31723
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b8b31723

Branch: refs/heads/master
Commit: b8b317237d7f28444d955922ef0d37e2e8b373a0
Parents: 2266ccc
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Oct 23 18:24:40 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Oct 23 18:24:40 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/library/common/shuffle/Fetcher.java | 120 +++++++++++++------
 .../orderedgrouped/FetcherOrderedGrouped.java   |  96 ++++++++++-----
 .../shuffle/orderedgrouped/MergeThread.java     |   1 +
 .../common/shuffle/orderedgrouped/Shuffle.java  |   5 +-
 .../orderedgrouped/ShuffleScheduler.java        |  11 +-
 6 files changed, 164 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a4e19a..ae6bc31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,7 @@ ALL CHANGES:
   TEZ-1688. Add applicationId as a primary filter for all Timeline data for easier export.
   TEZ-1141. DAGStatus.Progress should include number of failed and killed attempts.
   TEZ-1424. Fixes to DAG text representation in debug mode.
+  TEZ-1590. Fetchers should not report failures after the Processor on the task completes.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 9775986..2dea4d3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -191,9 +191,14 @@ public class Fetcher implements Callable<FetchResult> {
     }
 
     if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length
> 0) {
-      LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
-      for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
-        fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
+      if (!isShutDown.get()) {
+        LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
+        for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
+          fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
+        }
+      } else {
+        LOG.info("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length
+
+            " inputs since the fetcher has already been stopped");
       }
     }
 
@@ -405,7 +410,9 @@ public class Fetcher implements Callable<FetchResult> {
       // indirectly penalizing the host
       InputAttemptIdentifier[] failedFetches = null;
       if (isShutDown.get()) {
-        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+        LOG.info(
+            "Not reporting fetch failure during connection establishment, since an Exception
was caught after shutdown." +
+                e.getClass().getName() + ", Message: " + e.getMessage());
       } else {
         failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
       }
@@ -428,7 +435,9 @@ public class Fetcher implements Callable<FetchResult> {
       // with the first map, typically lost map. So, penalize only that map
       // and add the rest
       if (isShutDown.get()) {
-        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+        LOG.info(
+            "Not reporting fetch failure during connection establishment, since an Exception
was caught after shutdown." +
+                e.getClass().getName() + ", Message: " + e.getMessage());
       } else {
         InputAttemptIdentifier firstAttempt = attempts.get(0);
         LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
@@ -467,12 +476,22 @@ public class Fetcher implements Callable<FetchResult> {
     // yet_to_be_fetched list and marking the failed tasks.
     InputAttemptIdentifier[] failedInputs = null;
     while (!remaining.isEmpty() && failedInputs == null) {
+      if (isShutDown.get()) {
+        shutdownInternal(true);
+        LOG.info("Fetcher already shutdown. Aborting queued fetches for " + remaining.size()
+ " inputs");
+        return new HostFetchResult(new FetchResult(host, port, partition, remaining), null,
+            false);
+      }
       try {
         failedInputs = fetchInputs(input, callback);
       } catch (FetcherReadTimeoutException e) {
         //clean up connection
         shutdownInternal(true);
-
+        if (isShutDown.get()) {
+          LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for
" + remaining.size() + " inputs");
+          return new HostFetchResult(new FetchResult(host, port, partition, remaining), null,
+              false);
+        }
         // Connect again.
         connectionsWithRetryResult = setupConnection(
             new LinkedList<InputAttemptIdentifier>(remaining));
@@ -482,6 +501,11 @@ public class Fetcher implements Callable<FetchResult> {
       }
     }
 
+    if (isShutDown.get() && failedInputs != null && failedInputs.length >
0) {
+      LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " +
+          (failedInputs == null ? 0 : failedInputs.length) + " failed inputs");
+      failedInputs = null;
+    }
     return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
         false);
   }
@@ -496,8 +520,11 @@ public class Fetcher implements Callable<FetchResult> {
 
     Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
     while (iterator.hasNext()) {
+      if (isShutDown.get()) {
+        LOG.info("Already shutdown. Skipping fetch for " + remaining.size() + " inputs");
+        break;
+      }
       InputAttemptIdentifier srcAttemptId = iterator.next();
-      //TODO: check for shutdown? - See TEZ-1480
       long startTime = System.currentTimeMillis();
 
       FetchedInput fetchedInput = null;
@@ -528,21 +555,26 @@ public class Fetcher implements Callable<FetchResult> {
             idxRecord.getRawLength(), (endTime - startTime));
         iterator.remove();
       } catch (IOException e) {
+        cleanupFetchedInput(fetchedInput);
+        if (isShutDown.get()) {
+          LOG.info(
+              "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from
host " +
+                  host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+          break;
+        }
         LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local
fetch)",
             e);
-        if (fetchedInput != null) {
-          try {
-            fetchedInput.abort();
-          } catch (IOException e1) {
-            LOG.info("Failed to cleanup fetchedInput " + fetchedInput);
-          }
-        }
       }
     }
 
     InputAttemptIdentifier[] failedFetches = null;
     if (failMissing && remaining.size() > 0) {
-      failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+      if (isShutDown.get()) {
+        LOG.info("Already shutdown, not reporting fetch failures for: " + remaining.size()
+
+            " remaining inputs");
+      } else {
+        failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+      }
     } else {
       // nothing needs to be done to requeue remaining entries
     }
@@ -642,20 +674,30 @@ public class Fetcher implements Callable<FetchResult> {
         responsePartition = header.getPartition();
       } catch (IllegalArgumentException e) {
         // badIdErrs.increment(1);
-        LOG.warn("Invalid src id ", e);
-        // Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+        if (!isShutDown.get()) {
+          LOG.warn("Invalid src id ", e);
+          // Don't know which one was bad, so consider all of them as bad
+          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+        } else {
+          LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+          return null;
+        }
       }
 
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength,
           responsePartition, srcAttemptId, pathComponent)) {
-        if (srcAttemptId == null) {
-          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
-          srcAttemptId = getNextRemainingAttempt();
+        if (!isShutDown.get()) {
+          if (srcAttemptId == null) {
+            LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+            srcAttemptId = getNextRemainingAttempt();
+          }
+          assert (srcAttemptId != null);
+          return new InputAttemptIdentifier[]{srcAttemptId};
+        } else {
+          LOG.info("Already shutdown. Ignoring verification failure.");
+          return null;
         }
-        assert(srcAttemptId != null);
-        return new InputAttemptIdentifier[] { srcAttemptId };
       }
 
       if (LOG.isDebugEnabled()) {
@@ -672,7 +714,7 @@ public class Fetcher implements Callable<FetchResult> {
         fetchedInput = inputManager.allocate(decompressedLength,
             compressedLength, srcAttemptId);
       }
-      // TODO NEWTEZ No concept of WAIT at the moment.
+      // No concept of WAIT at the moment.
       // // Check if we can shuffle *now* ...
       // if (fetchedInput.getType() == FetchedInput.WAIT) {
       // LOG.info("fetcher#" + id +
@@ -722,23 +764,23 @@ public class Fetcher implements Callable<FetchResult> {
       // metrics.successFetch();
       return null;
     } catch (IOException ioe) {
+      if (isShutDown.get()) {
+        cleanupFetchedInput(fetchedInput);
+        LOG.info("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName()
+
+            ", Message: " + ioe.getMessage());
+        return null;
+      }
       if (shouldRetry(srcAttemptId, ioe)) {
         //release mem/file handles
-        if (fetchedInput != null) {
-          try {
-            fetchedInput.abort();
-          } catch (IOException e) {
-            LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
-          }
-        }
+        cleanupFetchedInput(fetchedInput);
         throw new FetcherReadTimeoutException(ioe);
       }
-      // ZZZ Add some shutdown code here
-      // ZZZ Make sure any assigned memory inputs are aborted
       // ioErrs.increment(1);
       if (srcAttemptId == null || fetchedInput == null) {
         LOG.info("fetcher" + " failed to read map header" + srcAttemptId
             + " decomp: " + decompressedLength + ", " + compressedLength, ioe);
+        // Cleanup the fetchedInput before returning.
+        cleanupFetchedInput(fetchedInput);
         if (srcAttemptId == null) {
           return remaining
               .toArray(new InputAttemptIdentifier[remaining.size()]);
@@ -749,14 +791,20 @@ public class Fetcher implements Callable<FetchResult> {
       LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host,
           ioe);
 
-      // Inform the shuffle-scheduler
+      // Cleanup the fetchedInput
+      cleanupFetchedInput(fetchedInput);
+      // metrics.failedFetch();
+      return new InputAttemptIdentifier[] { srcAttemptId };
+    }
+  }
+
+  private void cleanupFetchedInput(FetchedInput fetchedInput) {
+    if (fetchedInput != null) {
       try {
         fetchedInput.abort();
       } catch (IOException e) {
         LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
       }
-      // metrics.failedFetch();
-      return new InputAttemptIdentifier[] { srcAttemptId };
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 96f98cf..911f4f4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -182,6 +182,7 @@ class FetcherOrderedGrouped extends Thread {
       return;
     } catch (Throwable t) {
       shuffle.reportException(t);
+      // Shuffle knows how to deal with failures post shutdown via the onFailure hook
     }
   }
 
@@ -271,22 +272,32 @@ class FetcherOrderedGrouped extends Thread {
         } catch (FetcherReadTimeoutException e) {
           // Setup connection again if disconnected
           cleanupCurrentConnection(true);
-
+          if (stopped) {
+            LOG.info("Not re-establishing connection since Fetcher has been stopped");
+            return;
+          }
           // Connect with retry
           if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining)))
{
             if (stopped) {
               cleanupCurrentConnection(true);
+              LOG.info("Not reporting connection re-establishment failure since fetcher is
stopped");
+              return;
             }
             failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
             break;
           }
         }
       }
-      
-      if(failedTasks != null && failedTasks.length > 0) {
-        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(InputAttemptIdentifier left: failedTasks) {
-          scheduler.copyFailed(left, host, true, false);
+
+      if (failedTasks != null && failedTasks.length > 0) {
+        if (stopped) {
+          LOG.info("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks)
+
+              " since Fetcher has been stopped");
+        } else {
+          LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
+          for (InputAttemptIdentifier left : failedTasks) {
+            scheduler.copyFailed(left, host, true, false);
+          }
         }
       }
 
@@ -365,7 +376,7 @@ class FetcherOrderedGrouped extends Thread {
   }
 
   private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
-  
+
   protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
                                 DataInputStream input) throws FetcherReadTimeoutException
{
     MapOutput mapOutput = null;
@@ -382,8 +393,15 @@ class FetcherOrderedGrouped extends Thread {
         // TODO Review: Multiple header reads in case of status WAIT ? 
         header.readFields(input);
         if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
-          throw new IllegalArgumentException(
-              "Invalid header received: " + header.mapId + " partition: " + header.forReduce);
+          if (!stopped) {
+            badIdErrs.increment(1);
+            LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " +
+                InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
+            return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+          } else {
+            LOG.info("Already shutdown. Ignoring invalid map id error");
+            return EMPTY_ATTEMPT_ID_ARRAY;
+          }
         }
         srcAttemptId = 
             scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
@@ -391,22 +409,33 @@ class FetcherOrderedGrouped extends Thread {
         decompressedLength = header.uncompressedLength;
         forReduce = header.forReduce;
       } catch (IllegalArgumentException e) {
-        badIdErrs.increment(1);
-        LOG.warn("Invalid map id ", e);
-        // Don't know which one was bad, so consider this one bad and dont read
-        // the remaining because we dont know where to start reading from. YARN-1773
-        return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+        if (!stopped) {
+          badIdErrs.increment(1);
+          LOG.warn("Invalid map id ", e);
+          // Don't know which one was bad, so consider this one bad and dont read
+          // the remaining because we dont know where to start reading from. YARN-1773
+          return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+        } else {
+          LOG.info("Already shutdown. Ignoring invalid map id error. Exception: " +
+              e.getClass().getName() + ", Message: " + e.getMessage());
+          return EMPTY_ATTEMPT_ID_ARRAY;
+        }
       }
 
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, srcAttemptId)) {
-        if (srcAttemptId == null) {
-          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
-          srcAttemptId = getNextRemainingAttempt();
+        if (!stopped) {
+          if (srcAttemptId == null) {
+            LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+            srcAttemptId = getNextRemainingAttempt();
+          }
+          assert (srcAttemptId != null);
+          return new InputAttemptIdentifier[]{srcAttemptId};
+        } else {
+          LOG.info("Already stopped. Ignoring verification failure.");
+          return EMPTY_ATTEMPT_ID_ARRAY;
         }
-        assert(srcAttemptId != null);
-        return new InputAttemptIdentifier[] {srcAttemptId};
       }
       
       if(LOG.isDebugEnabled()) {
@@ -418,9 +447,13 @@ class FetcherOrderedGrouped extends Thread {
       try {
         mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
       } catch (IOException e) {
-        // Kill the reduce attempt
-        ioErrs.increment(1);
-        scheduler.reportLocalError(e);
+        if (!stopped) {
+          // Kill the reduce attempt
+          ioErrs.increment(1);
+          scheduler.reportLocalError(e);
+        } else {
+          LOG.info("Already stopped. Ignoring error from merger.reserve");
+        }
         return EMPTY_ATTEMPT_ID_ARRAY;
       }
       
@@ -601,6 +634,10 @@ class FetcherOrderedGrouped extends Thread {
     try {
       final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
       while (iter.hasNext()) {
+        // Avoid fetching more if already stopped
+        if (stopped) {
+          return;
+        }
         InputAttemptIdentifier srcAttemptId = iter.next();
         MapOutput mapOutput = null;
         try {
@@ -620,11 +657,16 @@ class FetcherOrderedGrouped extends Thread {
           if (mapOutput != null) {
             mapOutput.abort();
           }
-          metrics.failedFetch();
-          ioErrs.increment(1);
-          scheduler.copyFailed(srcAttemptId, host, true, false);
-          LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
-              host.getHostIdentifier(), e);
+          if (!stopped) {
+            metrics.failedFetch();
+            ioErrs.increment(1);
+            scheduler.copyFailed(srcAttemptId, host, true, false);
+            LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
+                host.getHostIdentifier(), e);
+          } else {
+            LOG.info("Ignoring fetch error during local disk copy since fetcher has already
been stopped");
+            return;
+          }
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
index bc6ea9a..7e720b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -88,6 +88,7 @@ abstract class MergeThread<T> extends Thread {
         // Merge
         merge(inputs);
       } catch (InterruptedException ie) {
+        // Meant to handle a shutdown of the entire fetch/merge process
         return;
       } catch(Throwable t) {
         reporter.reportException(t);

http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 5e4f668..e4641fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -245,7 +245,6 @@ public class Shuffle implements ExceptionReporter {
    * @throws IOException 
    * @throws InputAlreadyClosedException 
    */
-  // ZZZ Deal with these methods.
   public boolean isInputReady() throws IOException, InterruptedException, TezException {
     if (isShutDown.get()) {
       throw new InputAlreadyClosedException();
@@ -277,7 +276,6 @@ public class Shuffle implements ExceptionReporter {
    * @throws IOException
    * @throws InterruptedException
    */
-  // ZZZ Deal with these methods.
   public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException,
       TezException {
     Preconditions.checkState(runShuffleFuture != null,
@@ -314,6 +312,7 @@ public class Shuffle implements ExceptionReporter {
     }
   }
 
+  // Not handling any shutdown logic here. That's handled by the callback from this invocation.
   private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
     @Override
     public TezRawKeyValueIterator call() throws IOException, InterruptedException {
@@ -440,6 +439,7 @@ public class Shuffle implements ExceptionReporter {
 
   @Private
   public synchronized void reportException(Throwable t) {
+    // RunShuffleCallable onFailure deals with ignoring errors on shutdown.
     if (throwable == null) {
       throwable = t;
       throwingThreadName = Thread.currentThread().getName();
@@ -472,7 +472,6 @@ public class Shuffle implements ExceptionReporter {
 
     @Override
     public void onFailure(Throwable t) {
-      // ZZZ Handle failures during shutdown.
       if (isShutDown.get()) {
         LOG.info("Already shutdown. Ignoring error: ",  t);
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/b8b31723/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 6fec954..9cd8c64 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -61,13 +61,12 @@ class ShuffleScheduler {
   private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
   private static final long INITIAL_PENALTY = 2000l; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
-  
-  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
+
   private boolean[] finishedMaps;
   private final int numInputs;
   private int remainingMaps;
   private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
-  //TODO NEWTEZ Clean this and other maps at some point
+  //TODO Clean this and other maps at some point
   private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String,
InputAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
   private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
@@ -283,6 +282,7 @@ class ShuffleScheduler {
                 inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
                 srcAttempt.getAttemptNumber()));
       ioe.fillInStackTrace();
+      // Shuffle knows how to deal with failures post shutdown via the onFailure hook
       shuffle.reportException(ioe);
     }
 
@@ -299,6 +299,7 @@ class ShuffleScheduler {
 
   public void reportLocalError(IOException ioe) {
     LOG.error("Shuffle failed : caused by local error", ioe);
+    // Shuffle knows how to deal with failures post shutdown via the onFailure hook
     shuffle.reportException(ioe);
   }
 
@@ -372,6 +373,7 @@ class ShuffleScheduler {
           + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
           + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
       String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+      // Shuffle knows how to deal with failures post shutdown via the onFailure hook
       shuffle.reportException(new IOException(errorMsg));
     }
 
@@ -590,15 +592,16 @@ class ShuffleScheduler {
           }
         }
       } catch (InterruptedException ie) {
+        // This handles shutdown of the entire fetch / merge process.
         return;
       } catch (Throwable t) {
+        // Shuffle knows how to deal with failures post shutdown via the onFailure hook
         shuffle.reportException(t);
       }
     }
   }
   
   public void close() throws InterruptedException {
-    /// ZZZ need to interrupt setlf ?
     referee.interrupt();
     referee.join();
   }


Mime
View raw message