hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1136706 - in /hadoop/common/branches/branch-0.20-security-204: CHANGES.txt src/mapred/mapred-default.xml src/mapred/org/apache/hadoop/mapred/ReduceTask.java src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Date Thu, 16 Jun 2011 22:07:17 GMT
Author: omalley
Date: Thu Jun 16 22:07:17 2011
New Revision: 1136706

URL: http://svn.apache.org/viewvc?rev=1136706&view=rev
Log:
MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
fail faulty maps more aggressively. (Thomas Graves via cdouglas)

Added:
    hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
      - copied unchanged from r1131299, hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.20-security-204/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136706&r1=1136705&r2=1136706&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:07:17 2011
@@ -86,6 +86,9 @@ Release 0.20.204.0 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
+    fail faulty maps more aggressively. (Thomas Graves via cdouglas)
+
     MAPREDUCE-2529. Add support for regex-based shuffle metric counting
     exceptions. (Thomas Graves via cdouglas)
 

Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:07:17 2011
@@ -1,5 +1,5 @@
 /hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131299,1131737,1134140
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1133133,1133274,1133282
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/mapred-default.xml?rev=1136706&r1=1136705&r2=1136706&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/mapred-default.xml Thu Jun
16 22:07:17 2011
@@ -310,12 +310,11 @@
 </property>
 
 <property>
-  <name>mapred.reduce.copy.backoff</name>
-  <value>300</value>
-  <description>The maximum amount of time (in seconds) a reducer spends on 
-  fetching one map output before declaring it as failed.
-  </description>
-</property>
+  <name>mapreduce.reduce.shuffle.maxfetchfailures</name>
+  <value>10</value>
+  <description>The maximum number of times a reducer tries to
+  fetch a map output before it reports it.
+</description></property>
 
 <property>
   <name>mapreduce.reduce.shuffle.connect.timeout</name>

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1136706&r1=1136705&r2=1136706&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Thu Jun 16 22:07:17 2011
@@ -696,11 +696,6 @@ class ReduceTask extends Task {
      */
     private int maxInFlight;
     
-    /**
-     * the amount of time spent on fetching one map output before considering 
-     * it as failed and notifying the jobtracker about it.
-     */
-    private int maxBackoff;
     
     /**
      * busy hosts from which copies are being backed off
@@ -802,11 +797,31 @@ class ReduceTask extends Task {
     private int maxMapRuntime;
     
     /**
-     * Maximum number of fetch-retries per-map.
+     * Maximum number of fetch-retries per-map before reporting it.
      */
-    private volatile int maxFetchRetriesPerMap;
+    private int maxFetchFailuresBeforeReporting;
     
     /**
+     * Maximum number of fetch failures before reducer aborts.
+     */
+    private final int abortFailureLimit;
+
+    /**
+     * Initial penalty time in ms for a fetch failure.
+     */
+    private static final long INITIAL_PENALTY = 10000;
+
+    /**
+     * Penalty growth rate for each fetch failure.
+     */
+    private static final float PENALTY_GROWTH_RATE = 1.3f;
+
+    /**
+     * Default limit for maximum number of fetch failures before reporting.
+     */
+    private final static int REPORT_FAILURE_LIMIT = 10;
+
+    /**
      * Combiner runner, if a combiner is needed
      */
     private CombinerRunner combinerRunner;
@@ -1906,7 +1921,6 @@ class ReduceTask extends Task {
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxInFlight = 4 * numCopiers;
-      this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       Counters.Counter combineInputCounter = 
         reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
       this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
@@ -1918,18 +1932,12 @@ class ReduceTask extends Task {
       }
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
-      // the exponential backoff formula
-      //    backoff (t) = init * base^(t-1)
-      // so for max retries we get
-      //    backoff(1) + .... + backoff(max_fetch_retries) ~ max
-      // solving which we get
-      //    max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
-      // for the default value of max = 300 (5min) we get max_fetch_retries = 6
-      // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
-      
-      // optimizing for the base 2
-      this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-             getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
+      
+      this.abortFailureLimit = Math.max(30, numMaps / 10);
+
+      this.maxFetchFailuresBeforeReporting = conf.getInt(
+          "mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
+
       this.maxFailedUniqueFetches = Math.min(numMaps, 
                                              this.maxFailedUniqueFetches);
       this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
@@ -2219,44 +2227,19 @@ class ReduceTask extends Task {
               LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
                        noFailedFetches + " from " + mapTaskId);
 
-              // half the number of max fetch retries per map during 
-              // the end of shuffle
-              int fetchRetriesPerMap = maxFetchRetriesPerMap;
-              int pendingCopies = numMaps - numCopied;
-              
-              // The check noFailedFetches != maxFetchRetriesPerMap is
-              // required to make sure of the notification in case of a
-              // corner case : 
-              // when noFailedFetches reached maxFetchRetriesPerMap and 
-              // reducer reached the end of shuffle, then we may miss sending
-              // a notification if the difference between 
-              // noFailedFetches and fetchRetriesPerMap is not divisible by 2 
-              if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
-                  noFailedFetches != maxFetchRetriesPerMap) {
-                fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+              if (noFailedFetches >= abortFailureLimit) {
+                LOG.fatal(noFailedFetches + " failures downloading "
+                          + getTaskID() + ".");
+                umbilical.shuffleError(getTaskID(),
+                                 "Exceeded the abort failure limit;"
+                                 + " bailing-out.", jvmContext);
               }
               
-              // did the fetch fail too many times?
-              // using a hybrid technique for notifying the jobtracker.
-              //   a. the first notification is sent after max-retries 
-              //   b. subsequent notifications are sent after 2 retries.   
-              //   c. send notification immediately if it is a read error and 
-              //       "mapreduce.reduce.shuffle.notify.readerror" set true.   
-              if ((reportReadErrorImmediately && cr.getError().equals(
-                  CopyOutputErrorType.READ_ERROR)) ||
-                 ((noFailedFetches >= fetchRetriesPerMap) 
-                  && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
-                synchronized (ReduceTask.this) {
-                  taskStatus.addFetchFailedMap(mapTaskId);
-                  reporter.progress();
-                  LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
-                           + " or it is a read error, "
-                           + " reporting to the JobTracker");
-                }
-              }
+              checkAndInformJobTracker(noFailedFetches, mapTaskId,
+                  cr.getError().equals(CopyOutputErrorType.READ_ERROR));
+
               // note unique failed-fetch maps
-              if (noFailedFetches == maxFetchRetriesPerMap) {
+              if (noFailedFetches == maxFetchFailuresBeforeReporting) {
                 fetchFailedMaps.add(mapId);
                   
                 // did we have too many unique failed-fetch maps?
@@ -2302,26 +2285,12 @@ class ReduceTask extends Task {
                                          "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
                                          + " bailing-out.", jvmContext);
                 }
+
               }
                 
-              // back off exponentially until num_retries <= max_retries
-              // back off by max_backoff/2 on subsequent failed attempts
               currentTime = System.currentTimeMillis();
-              int currentBackOff = noFailedFetches <= fetchRetriesPerMap 
-                                   ? BACKOFF_INIT 
-                                     * (1 << (noFailedFetches - 1)) 
-                                   : (this.maxBackoff * 1000 / 2);
-              // If it is read error,
-              //    back off for maxMapRuntime/2
-              //    during end of shuffle, 
-              //      backoff for min(maxMapRuntime/2, currentBackOff) 
-              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
-                int backOff = maxMapRuntime >> 1;
-                if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
-                  backOff = Math.min(backOff, currentBackOff); 
-                } 
-                currentBackOff = backOff;
-              }
+              long currentBackOff = (long)(INITIAL_PENALTY *
+                  Math.pow(PENALTY_GROWTH_RATE, noFailedFetches));
 
               penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
               LOG.warn(reduceTask.getTaskID() + " adding host " +
@@ -2386,6 +2355,26 @@ class ReduceTask extends Task {
         return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
     }
     
+    // Notify the JobTracker
+    // after every read error, if 'reportReadErrorImmediately' is true or
+    // after every 'maxFetchFailuresBeforeReporting' failures
+    protected void checkAndInformJobTracker(
+        int failures, TaskAttemptID mapId, boolean readError) {
+      if ((reportReadErrorImmediately && readError)
+          || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        synchronized (ReduceTask.this) {
+          taskStatus.addFetchFailedMap(mapId);
+          reporter.progress();
+          LOG.info("Failed to fetch map-output from " + mapId +
+                   " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                   + " or it is a read error, "
+                   + " reporting to the JobTracker");
+        }
+      }
+    }
+
+
+
     private long createInMemorySegments(
         List<Segment<K, V>> inMemorySegments, long leaveBytes)
         throws IOException {
@@ -2887,13 +2876,6 @@ class ReduceTask extends Task {
               URI u = URI.create(event.getTaskTrackerHttp());
               String host = u.getHost();
               TaskAttemptID taskId = event.getTaskAttemptId();
-              int duration = event.getTaskRunTime();
-              if (duration > maxMapRuntime) {
-                maxMapRuntime = duration; 
-                // adjust max-fetch-retries based on max-map-run-time
-                maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
-              }
               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                       "/mapOutput?job=" + taskId.getJobID() +
                                       "&map=" + taskId + 



Mime
View raw message