Author: ddas
Date: Wed May 14 06:03:45 2008
New Revision: 656260
URL: http://svn.apache.org/viewvc?rev=656260&view=rev
Log:
HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase. Contributed by Devaraj
Das.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656260&r1=656259&r2=656260&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 14 06:03:45 2008
@@ -120,6 +120,9 @@
HADOOP-3334. Move lease handling from FSNamesystem into a seperate class.
(Tsz Wo (Nicholas), SZE via rangadi)
+ HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase.
+ (Devaraj Das)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=656260&r1=656259&r2=656260&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May 14 06:03:45
2008
@@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
+import java.util.LinkedHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -613,6 +614,11 @@
* Initial backoff interval (milliseconds)
*/
private static final int BACKOFF_INIT = 4000;
+
+ /**
+ * The interval for logging in the shuffle
+ */
+ private static final int MIN_LOG_TIME = 60000;
/**
* This class contains the methods that should be used for metrics-reporting
@@ -989,7 +995,7 @@
sorter.setProgressable(getReporter(umbilical));
// hosts -> next contact time
- this.penaltyBox = new Hashtable<String, Long>();
+ this.penaltyBox = new LinkedHashMap<String, Long>();
// hostnames
this.uniqueHosts = new HashSet<String>();
@@ -1041,16 +1047,25 @@
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
- long lastProgressTime = System.currentTimeMillis();
+ long lastProgressTime = startTime;
+ long lastOutputTime = 0;
IntWritable fromEventId = new IntWritable(0);
try {
// loop until we get all required outputs
while (!neededOutputs.isEmpty() && mergeThrowable == null) {
- LOG.info(reduceTask.getTaskID() + " Need another "
+ currentTime = System.currentTimeMillis();
+ boolean logNow = false;
+ if (currentTime - lastOutputTime > MIN_LOG_TIME) {
+ lastOutputTime = currentTime;
+ logNow = true;
+ }
+ if (logNow) {
+ LOG.info(reduceTask.getTaskID() + " Need another "
+ neededOutputs.size() + " map output(s) where "
+ numInFlight + " is already in progress");
+ }
try {
// Put the hash entries for the failed fetches. Entries here
@@ -1065,16 +1080,29 @@
// used for the next call to getMapCompletionEvents
int currentNumKnownMaps = knownOutputs.size();
int currentNumObsoleteMapIds = obsoleteMapIds.size();
- getMapCompletionEvents(fromEventId, knownOutputs);
+ getMapCompletionEvents(fromEventId, knownOutputs);
- LOG.info(reduceTask.getTaskID() + ": " +
- "Got " + (knownOutputs.size()-currentNumKnownMaps) +
- " new map-outputs & " +
- (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
- " obsolete map-outputs from tasktracker and " +
- retryFetches.size() + " map-outputs from previous failures"
- );
+ int numNewOutputs = knownOutputs.size()-currentNumKnownMaps;
+ if (numNewOutputs > 0 || logNow) {
+ LOG.info(reduceTask.getTaskID() + ": " +
+ "Got " + numNewOutputs +
+ " new map-outputs & number of known map outputs is " +
+ knownOutputs.size());
+ }
+
+ int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
+ if (numNewObsoleteMaps > 0) {
+ LOG.info(reduceTask.getTaskID() + ": " +
+ "Got " + numNewObsoleteMaps +
+ " obsolete map-outputs from tasktracker ");
+ }
+
+ if (retryFetches.size() > 0) {
+ LOG.info(reduceTask.getTaskID() + ": " +
+ "Got " + retryFetches.size() +
+ " map-outputs from previous failures");
+ }
// clear the "failed" fetches hashmap
retryFetches.clear();
}
@@ -1086,10 +1114,7 @@
// now walk through the cache and schedule what we can
int numKnown = knownOutputs.size(), numScheduled = 0;
- int numSlow = 0, numDups = 0;
-
- LOG.info(reduceTask.getTaskID() + " Got " + numKnown +
- " known map output location(s); scheduling...");
+ int numDups = 0;
synchronized (scheduledCopies) {
// Randomize the map output locations to prevent
@@ -1098,7 +1123,7 @@
Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
- currentTime = System.currentTimeMillis();
+
while (locIt.hasNext()) {
MapOutputLocation loc = locIt.next();
@@ -1112,8 +1137,12 @@
Long penaltyEnd = penaltyBox.get(loc.getHost());
boolean penalized = false, duplicate = false;
- if (penaltyEnd != null && currentTime < penaltyEnd.longValue())
{
- penalized = true; numSlow++;
+ if (penaltyEnd != null) {
+ if (currentTime < penaltyEnd.longValue()) {
+ penalized = true;
+ } else {
+ penaltyBox.remove(loc.getHost());
+ }
}
if (uniqueHosts.contains(loc.getHost())) {
duplicate = true; numDups++;
@@ -1128,9 +1157,18 @@
}
scheduledCopies.notifyAll();
}
- LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
- " of " + numKnown + " known outputs (" + numSlow +
+ if (numScheduled > 0 || logNow) {
+ LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
+ " of " + numKnown + " known outputs (" + penaltyBox.size() +
" slow hosts and " + numDups + " dup hosts)");
+ }
+ if (penaltyBox.size() > 0 && logNow) {
+ LOG.info("Penalized(slow) Hosts: ");
+ for (String host : penaltyBox.keySet()) {
+ LOG.info(host + " Will be considered after: " +
+ ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
+ }
+ }
// Check if a on-disk merge can be done. This will help if there
// are no copies to be fetched but sufficient copies to be merged.
|