hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r679769 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Fri, 25 Jul 2008 12:05:51 GMT
Author: ddas
Date: Fri Jul 25 05:05:51 2008
New Revision: 679769

URL: http://svn.apache.org/viewvc?rev=679769&view=rev
Log:
HADOOP-3327. Treats connection and read timeouts differently in the shuffle and the backoff
logic is dependent on the type of timeout. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=679769&r1=679768&r2=679769&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 25 05:05:51 2008
@@ -103,6 +103,10 @@
     HADOOP-3756. Minor. Remove unused dfs.client.buffer.dir from 
     hadoop-default.xml. (rangadi)
 
+    HADOOP-3327. Treats connection and read timeouts differently in the 
+    shuffle and the backoff logic is dependent on the type of timeout.
+    (Jothi Padmanabhan via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=679769&r1=679768&r2=679769&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Jul 25 05:05:51
2008
@@ -71,6 +71,8 @@
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.net.ConnTimeoutException;
+import org.apache.hadoop.net.ReadTimeoutException;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -91,6 +93,12 @@
   private int numMaps;
   private ReduceCopier reduceCopier;
 
+  private static enum CopyOutputErrorType {
+	  NO_ERROR,
+	  CONNECTION_ERROR,
+	  READ_ERROR
+  }
+
   private CompressionCodec codec;
 
 
@@ -99,6 +107,7 @@
     setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
   }
 
+
   private Progress copyPhase = getProgress().addPhase("copy");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
@@ -519,9 +528,15 @@
     Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>(); 
     
     /**
-     * A map of taskId -> no. of failed fetches
+     * A map of taskId -> no. of failed fetches in connect
+     */
+    Map<TaskAttemptID, Integer> mapTaskToConnectFailedFetchesMap = 
+      new HashMap<TaskAttemptID, Integer>();    
+
+    /**
+     * A map of taskId -> no. of failed fetches in read
      */
-    Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = 
+    Map<TaskAttemptID, Integer> mapTaskToReadFailedFetchesMap = 
       new HashMap<TaskAttemptID, Integer>();    
 
     /**
@@ -541,6 +556,7 @@
       Collections.synchronizedList(new LinkedList<MapOutput>());
     
 
+
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -603,7 +619,8 @@
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
-      // the map output location against which a copy attempt was made
+
+    	// the map output location against which a copy attempt was made
       private final MapOutputLocation loc;
       
       // the size of the file copied, -1 if the transfer failed
@@ -611,10 +628,14 @@
       
       //a flag signifying whether a copy result is obsolete
       private static final int OBSOLETE = -2;
-      
-      CopyResult(MapOutputLocation loc, long size) {
+
+      CopyOutputErrorType errorType;
+
+      CopyResult(MapOutputLocation loc, long size, 
+    		     CopyOutputErrorType errorType) {
         this.loc = loc;
         this.size = size;
+        this.errorType = errorType;
       }
       
       public boolean getSuccess() { return size >= 0; }
@@ -623,6 +644,9 @@
       }
       public long getSize() { return size; }
       public String getHost() { return loc.getHost(); }
+      public CopyOutputErrorType getErrorType() {
+    	  return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR); 
+      }
       public MapOutputLocation getLocation() { return loc; }
     }
     
@@ -875,19 +899,6 @@
       }
       
       /**
-       * Fail the current file that we are fetching
-       * @return were we currently fetching?
-       */
-      public synchronized boolean fail() {
-        if (currentLocation != null) {
-          finish(-1);
-          return true;
-        } else {
-          return false;
-        }
-      }
-      
-      /**
        * Get the current map output location.
        */
       public synchronized MapOutputLocation getLocation() {
@@ -898,11 +909,12 @@
         currentLocation = loc;
       }
       
-      private synchronized void finish(long size) {
+      private synchronized void finish(long size, 
+    		                           CopyOutputErrorType errorType) {
         if (currentLocation != null) {
           LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
           synchronized (copyResults) {
-            copyResults.add(new CopyResult(currentLocation, size));
+            copyResults.add(new CopyResult(currentLocation, size, errorType));
             copyResults.notify();
           }
           currentLocation = null;
@@ -918,6 +930,7 @@
           try {
             MapOutputLocation loc = null;
             long size = -1;
+            CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR;
             
             synchronized (scheduledCopies) {
               while (scheduledCopies.isEmpty()) {
@@ -939,9 +952,18 @@
               
               // Reset 
               size = -1;
+
+              // Identify the error type 
+              if (e.getClass() == ConnTimeoutException.class) {
+                errorType = CopyOutputErrorType.CONNECTION_ERROR;
+              }
+              else if (e.getClass() == ReadTimeoutException.class) {
+                errorType = CopyOutputErrorType.READ_ERROR;
+              }
+
             } finally {
               shuffleClientMetrics.threadFree();
-              finish(size);
+              finish(size, errorType);
             }
           } catch (InterruptedException e) { 
             return; // ALL DONE
@@ -1124,26 +1146,17 @@
         connection.setReadTimeout(readTimeout);
         // set the connect timeout to the unit-connect-timeout
         connection.setConnectTimeout(unit);
-        while (true) {
-          try {
-            return connection.getInputStream();
-          } catch (IOException ioe) {
-            // update the total remaining connect-timeout
-            connectionTimeout -= unit;
 
-            // throw an exception if we have waited for timeout amount of time
-            // note that the updated value if timeout is used here
-            if (connectionTimeout == 0) {
-              throw ioe;
-            }
+        try {
+          connection.connect();
+        } catch (IOException ioe) {
+          throw new ConnTimeoutException("Connection Timed out");
+        }
 
-            // reset the connect timeout for the last try
-            if (connectionTimeout < unit) {
-              unit = connectionTimeout;
-              // reset the connect time out for the final connect
-              connection.setConnectTimeout(unit);
-            }
-          }
+        try {
+          return connection.getInputStream();
+        } catch (IOException ioe) {
+          throw new ReadTimeoutException("Read Timed out");
         }
       }
 
@@ -1699,33 +1712,61 @@
                        cr.getHost());
             } else {
               retryFetches.add(cr.getLocation());
+
+              CopyOutputErrorType errorType = cr.getErrorType();
               
               // note the failed-fetch
               TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
               TaskID mapId = cr.getLocation().getTaskId();
               
               totalFailures++;
-              Integer noFailedFetches = 
-                mapTaskToFailedFetchesMap.get(mapTaskId);
-              noFailedFetches = 
-                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
-              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
-              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
-                       noFailedFetches + " from " + mapTaskId);
-              
-              // did the fetch fail too many times?
-              // using a hybrid technique for notifying the jobtracker.
-              //   a. the first notification is sent after max-retries 
-              //   b. subsequent notifications are sent after 2 retries.   
-              if ((noFailedFetches >= maxFetchRetriesPerMap) 
-                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+
+              Integer noFailedFetches = 0;
+
+              Integer noReadFailedFetches = 
+                mapTaskToReadFailedFetchesMap.get(mapTaskId);
+
+              if (noReadFailedFetches == null) noReadFailedFetches = 0;
+
+              Integer noConnectFailedFetches = 
+                mapTaskToConnectFailedFetchesMap.get(mapTaskId);
+
+              if (noConnectFailedFetches == null) noConnectFailedFetches = 0;
+
+              if (errorType == CopyOutputErrorType.READ_ERROR) {
+                noReadFailedFetches ++;
+                mapTaskToReadFailedFetchesMap.put (mapTaskId, 
+                                                   noReadFailedFetches);
                 synchronized (ReduceTask.this) {
                   taskStatus.addFetchFailedMap(mapTaskId);
                   LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
-                           + " reporting to the JobTracker");
+                           " Got a Read Time out," + 
+                           " reporting to the JobTracker");
+                }
+              } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
+                noConnectFailedFetches ++;
+                mapTaskToConnectFailedFetchesMap.put (
+                  mapTaskId, noConnectFailedFetches);
+
+                LOG.info("Task " + getTaskID() + ": Failed fetch #"  
+                  + noConnectFailedFetches + " from " + mapTaskId); 
+
+                if ((noConnectFailedFetches >= maxFetchRetriesPerMap) &&     
        
+                   ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2)      
+                    == 0) {               
+                  synchronized (ReduceTask.this) {       
+                    taskStatus.addFetchFailedMap(mapTaskId);
+                    LOG.info("Failed to fetch map-output from " + mapTaskId     
+                             + " even after MAX_FETCH_RETRIES_PER_MAP"
+                             + " (connect) retries... "     
+                             + " reporting to the JobTracker");            
+                  }
                 }
               }
+
+              noFailedFetches = noConnectFailedFetches + 
+                                noReadFailedFetches;
+
               // note unique failed-fetch maps
               if (noFailedFetches == maxFetchRetriesPerMap) {
                 fetchFailedMaps.add(mapId);
@@ -1774,22 +1815,32 @@
                 }
               }
                 
-              // back off exponentially until num_retries <= max_retries
-              // back off by max_backoff/2 on subsequent failed attempts
-              currentTime = System.currentTimeMillis();
-              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
+              if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
+                // back off exponentially until num_retries <= max_retries
+                // back off by max_backoff/2 on subsequent failed attempts
+                currentTime = System.currentTimeMillis();
+                int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
                                    ? BACKOFF_INIT 
                                      * (1 << (noFailedFetches - 1)) 
                                    : (this.maxBackoff * 1000 / 2);
-              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
-              LOG.warn(reduceTask.getTaskID() + " adding host " +
+                penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+                LOG.warn(reduceTask.getTaskID() + " adding host " +
                        cr.getHost() + " to penalty box, next contact in " +
                        (currentBackOff/1000) + " seconds");
-            }
+              } else if (errorType == CopyOutputErrorType.READ_ERROR) {
+                int backOff = Math.max(maxMapRuntime/2,
+                                    (this.maxBackoff * 1000));    
+                penaltyBox.put(cr.getHost(), currentTime + backOff);
+                LOG.warn(reduceTask.getTaskID() + " adding host " +
+                       cr.getHost() + " to penalty box, next contact in " + 
+                       (backOff/1000) + " seconds");  
+              }  
+
+            } // Fetch Failure
             uniqueHosts.remove(cr.getHost());
             numInFlight--;
-          }
-        }
+          } // while (numInFlight > 0)
+        } // while (copiedMaps < numMaps)
         
         // all done, inform the copiers to exit
         synchronized (copiers) {



Mime
View raw message