hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1197285 - in /hadoop/common/branches/branch-0.20-security: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Thu, 03 Nov 2011 19:40:30 GMT
Author: todd
Date: Thu Nov  3 19:40:29 2011
New Revision: 1197285

URL: http://svn.apache.org/viewvc?rev=1197285&view=rev
Log:
MAPREDUCE-3287. Fix a busy loop in ReduceTask that would cause 100% cpu utilization during
the fetch phase. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1197285&r1=1197284&r2=1197285&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Thu Nov  3 19:40:29 2011
@@ -56,6 +56,10 @@ Release 0.20.206.0 - unreleased
     HDFS-2379. Allow block reports to proceed without holding FSDataset lock.
     (todd)
 
+    MAPREDUCE-3287. Fix a busy loop in ReduceTask that would cause 100%
+    cpu utilization during the fetch phase. (todd)
+
+
 Release 0.20.205.1 - unreleased
 
   IMPROVEMENTS

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1197285&r1=1197284&r2=1197285&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Thu Nov  3 19:40:29 2011
@@ -685,6 +685,10 @@ class ReduceTask extends Task {
      */
     private List<CopyResult> copyResults;
     
+    int numEventsFetched = 0;
+    private Object copyResultsOrNewEventsLock = new Object();
+
+    
     /**
      *  the number of outputs to copy in parallel
      */
@@ -1294,9 +1298,9 @@ class ReduceTask extends Task {
       private synchronized void finish(long size, CopyOutputErrorType error) {
         if (currentLocation != null) {
           LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
-          synchronized (copyResults) {
+          synchronized (copyResultsOrNewEventsLock) {
             copyResults.add(new CopyResult(currentLocation, size, error));
-            copyResults.notify();
+            copyResultsOrNewEventsLock.notifyAll();
           }
           currentLocation = null;
         }
@@ -2024,6 +2028,10 @@ class ReduceTask extends Task {
       
         // loop until we get all required outputs
         while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
+          int numEventsAtStartOfScheduling;
+          synchronized (copyResultsOrNewEventsLock) {
+            numEventsAtStartOfScheduling = numEventsFetched;
+          }
           
           currentTime = System.currentTimeMillis();
           boolean logNow = false;
@@ -2181,7 +2189,7 @@ class ReduceTask extends Task {
             //So, when getCopyResult returns null, we can be sure that
             //we aren't busy enough and we should go and get more mapcompletion
             //events from the tasktracker
-            CopyResult cr = getCopyResult(numInFlight);
+            CopyResult cr = getCopyResult(numInFlight, numEventsAtStartOfScheduling);
 
             if (cr == null) {
               break;
@@ -2552,14 +2560,29 @@ class ReduceTask extends Task {
       }
     }
 
-    private CopyResult getCopyResult(int numInFlight) {  
-      synchronized (copyResults) {
+    private CopyResult getCopyResult(int numInFlight, int numEventsAtStartOfScheduling) {
+      boolean waitedForNewEvents = false;
+      
+      synchronized (copyResultsOrNewEventsLock) {
         while (copyResults.isEmpty()) {
           try {
             //The idea is that if we have scheduled enough, we can wait until
-            //we hear from one of the copiers.
+            // we hear from one of the copiers, or until there are new
+            // map events ready to be scheduled
             if (busyEnough(numInFlight)) {
-              copyResults.wait();
+              // All of the fetcher threads are busy. So, no sense trying
+              // to schedule more until one finishes.
+              copyResultsOrNewEventsLock.wait();
+            } else if (numEventsFetched == numEventsAtStartOfScheduling &&
+                       !waitedForNewEvents) {
+              // no sense trying to schedule more, since there are no
+              // new events to even try to schedule.
+              // We could handle this with a normal wait() without a timeout,
+              // but since this code is being introduced in a stable branch,
+              // we want to be very conservative. A 2-second wait is enough
+              // to prevent the busy-loop experienced before.
+              waitedForNewEvents = true;
+              copyResultsOrNewEventsLock.wait(2000);
             } else {
               return null;
             }
@@ -2808,6 +2831,12 @@ class ReduceTask extends Task {
         do {
           try {
             int numNewMaps = getMapCompletionEvents();
+            if (numNewMaps > 0) {
+              synchronized (copyResultsOrNewEventsLock) {
+                numEventsFetched += numNewMaps;
+                copyResultsOrNewEventsLock.notifyAll();
+              }
+            }
             if (LOG.isDebugEnabled()) {
               if (numNewMaps > 0) {
                 LOG.debug(reduceTask.getTaskID() + ": " +  



Mime
View raw message