tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [11/23] git commit: TEZ-1029. Fetcher can fail to report input failed event upon connection error (bikas)
Date Fri, 20 Jun 2014 22:35:49 GMT
TEZ-1029. Fetcher can fail to report input failed event upon connection error (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/469ffe63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/469ffe63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/469ffe63

Branch: refs/heads/branch-0.4.1-incubating
Commit: 469ffe639a3b95a8380a473f32817a95d568f2f5
Parents: 2b3c144
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 8 17:48:42 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../tez/runtime/library/common/shuffle/impl/Fetcher.java  |  5 +++--
 .../library/common/shuffle/impl/ShuffleScheduler.java     | 10 ++++++----
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/469ffe63/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 0433098..ac2c5b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -309,7 +309,8 @@ class Fetcher extends Thread {
 
       for(InputAttemptIdentifier left: remaining) {
         // Need to be handling temporary glitches .. 
-        scheduler.copyFailed(left, host, !connectSucceeded);
+        // Report read error to the AM to trigger source failure heuristics
+        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
       }
 
       // Add back all remaining maps - which at this point is ALL MAPS the
@@ -335,7 +336,7 @@ class Fetcher extends Thread {
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
         for(InputAttemptIdentifier left: failedTasks) {
-          scheduler.copyFailed(left, host, true);
+          scheduler.copyFailed(left, host, true, false);
         }
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/469ffe63/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index a5b79fb..b145a40 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -215,7 +215,8 @@ class ShuffleScheduler {
 
   public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
                                       MapHost host,
-                                      boolean readError) {
+                                      boolean readError,
+                                      boolean connectError) {
     host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(srcAttempt)) {
@@ -252,7 +253,7 @@ class ShuffleScheduler {
     }
 
     failedShuffleCounter.increment(1);
-    checkAndInformAM(failures, srcAttempt, readError);
+    checkAndInformAM(failures, srcAttempt, readError, connectError);
 
     checkReducerHealth();
     
@@ -271,8 +272,9 @@ class ShuffleScheduler {
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformAM(
-      int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
-    if ((reportReadErrorImmediately && readError)
+      int failures, InputAttemptIdentifier srcAttempt, boolean readError,
+      boolean connectError) {
+    if ((reportReadErrorImmediately && (readError || connectError))
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
       LOG.info("Reporting fetch failure for InputIdentifier: " 
           + srcAttempt + " taskAttemptIdentifier: "


Mime
View raw message