Author: todd
Date: Thu Nov 3 19:40:29 2011
New Revision: 1197285
URL: http://svn.apache.org/viewvc?rev=1197285&view=rev
Log:
MAPREDUCE-3287. Fix a busy loop in ReduceTask that would cause 100% cpu utilization during
the fetch phase. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1197285&r1=1197284&r2=1197285&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Thu Nov 3 19:40:29 2011
@@ -56,6 +56,10 @@ Release 0.20.206.0 - unreleased
HDFS-2379. Allow block reports to proceed without holding FSDataset lock.
(todd)
+ MAPREDUCE-3287. Fix a busy loop in ReduceTask that would cause 100%
+ cpu utilization during the fetch phase. (todd)
+
+
Release 0.20.205.1 - unreleased
IMPROVEMENTS
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1197285&r1=1197284&r2=1197285&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Thu Nov 3 19:40:29 2011
@@ -685,6 +685,10 @@ class ReduceTask extends Task {
*/
private List<CopyResult> copyResults;
+ int numEventsFetched = 0;
+ private Object copyResultsOrNewEventsLock = new Object();
+
+
/**
* the number of outputs to copy in parallel
*/
@@ -1294,9 +1298,9 @@ class ReduceTask extends Task {
private synchronized void finish(long size, CopyOutputErrorType error) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
- synchronized (copyResults) {
+ synchronized (copyResultsOrNewEventsLock) {
copyResults.add(new CopyResult(currentLocation, size, error));
- copyResults.notify();
+ copyResultsOrNewEventsLock.notifyAll();
}
currentLocation = null;
}
@@ -2024,6 +2028,10 @@ class ReduceTask extends Task {
// loop until we get all required outputs
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
+ int numEventsAtStartOfScheduling;
+ synchronized (copyResultsOrNewEventsLock) {
+ numEventsAtStartOfScheduling = numEventsFetched;
+ }
currentTime = System.currentTimeMillis();
boolean logNow = false;
@@ -2181,7 +2189,7 @@ class ReduceTask extends Task {
//So, when getCopyResult returns null, we can be sure that
//we aren't busy enough and we should go and get more mapcompletion
//events from the tasktracker
- CopyResult cr = getCopyResult(numInFlight);
+ CopyResult cr = getCopyResult(numInFlight, numEventsAtStartOfScheduling);
if (cr == null) {
break;
@@ -2552,14 +2560,29 @@ class ReduceTask extends Task {
}
}
- private CopyResult getCopyResult(int numInFlight) {
- synchronized (copyResults) {
+ private CopyResult getCopyResult(int numInFlight, int numEventsAtStartOfScheduling) {
+ boolean waitedForNewEvents = false;
+
+ synchronized (copyResultsOrNewEventsLock) {
while (copyResults.isEmpty()) {
try {
//The idea is that if we have scheduled enough, we can wait until
- //we hear from one of the copiers.
+ // we hear from one of the copiers, or until there are new
+ // map events ready to be scheduled
if (busyEnough(numInFlight)) {
- copyResults.wait();
+ // All of the fetcher threads are busy. So, no sense trying
+ // to schedule more until one finishes.
+ copyResultsOrNewEventsLock.wait();
+ } else if (numEventsFetched == numEventsAtStartOfScheduling &&
+ !waitedForNewEvents) {
+ // no sense trying to schedule more, since there are no
+ // new events to even try to schedule.
+ // We could handle this with a normal wait() without a timeout,
+ // but since this code is being introduced in a stable branch,
+ // we want to be very conservative. A 2-second wait is enough
+ // to prevent the busy-loop experienced before.
+ waitedForNewEvents = true;
+ copyResultsOrNewEventsLock.wait(2000);
} else {
return null;
}
@@ -2808,6 +2831,12 @@ class ReduceTask extends Task {
do {
try {
int numNewMaps = getMapCompletionEvents();
+ if (numNewMaps > 0) {
+ synchronized (copyResultsOrNewEventsLock) {
+ numEventsFetched += numNewMaps;
+ copyResultsOrNewEventsLock.notifyAll();
+ }
+ }
if (LOG.isDebugEnabled()) {
if (numNewMaps > 0) {
LOG.debug(reduceTask.getTaskID() + ": " +
|