From core-commits-return-4796-apmail-hadoop-core-commits-archive=hadoop.apache.org@hadoop.apache.org Wed May 14 13:04:13 2008 Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 29177 invoked from network); 14 May 2008 13:04:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 May 2008 13:04:11 -0000 Received: (qmail 29709 invoked by uid 500); 14 May 2008 13:04:12 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 29688 invoked by uid 500); 14 May 2008 13:04:12 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 29672 invoked by uid 99); 14 May 2008 13:04:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 May 2008 06:04:12 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 May 2008 13:03:25 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 376D72388999; Wed, 14 May 2008 06:03:46 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r656260 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java Date: Wed, 14 May 2008 13:03:46 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080514130346.376D72388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Wed May 14 06:03:45 2008 New Revision: 656260 URL: http://svn.apache.org/viewvc?rev=656260&view=rev Log: HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase. Contributed by Devaraj Das. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656260&r1=656259&r2=656260&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed May 14 06:03:45 2008 @@ -120,6 +120,9 @@ HADOOP-3334. Move lease handling from FSNamesystem into a seperate class. (Tsz Wo (Nicholas), SZE via rangadi) + HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase. + (Devaraj Das) + OPTIMIZATIONS HADOOP-3274. The default constructor of BytesWritable creates empty Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=656260&r1=656259&r2=656260&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May 14 06:03:45 2008 @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; +import java.util.LinkedHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -613,6 +614,11 @@ * Initial backoff interval (milliseconds) */ private static final int BACKOFF_INIT = 4000; + + /** + * The interval for logging in the shuffle + */ + private static final int MIN_LOG_TIME = 60000; /** * This class contains the methods that should be used for metrics-reporting @@ -989,7 +995,7 @@ sorter.setProgressable(getReporter(umbilical)); // hosts -> next contact time - this.penaltyBox = new Hashtable(); + this.penaltyBox = new LinkedHashMap(); // hostnames this.uniqueHosts = new HashSet(); @@ -1041,16 +1047,25 @@ // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; - long lastProgressTime = System.currentTimeMillis(); + long lastProgressTime = startTime; + long lastOutputTime = 0; IntWritable fromEventId = new IntWritable(0); try { // loop until we get all required outputs while (!neededOutputs.isEmpty() && mergeThrowable == null) { - LOG.info(reduceTask.getTaskID() + " Need another " + currentTime = System.currentTimeMillis(); + boolean logNow = false; + if (currentTime - lastOutputTime > MIN_LOG_TIME) { + lastOutputTime = currentTime; + logNow = true; + } + if (logNow) { + LOG.info(reduceTask.getTaskID() + " Need another " + neededOutputs.size() + " map output(s) where " + numInFlight + " is already in progress"); + } try { // Put the hash entries for the failed fetches. Entries here @@ -1065,16 +1080,29 @@ // used for the next call to getMapCompletionEvents int currentNumKnownMaps = knownOutputs.size(); int currentNumObsoleteMapIds = obsoleteMapIds.size(); - getMapCompletionEvents(fromEventId, knownOutputs); + getMapCompletionEvents(fromEventId, knownOutputs); - LOG.info(reduceTask.getTaskID() + ": " + - "Got " + (knownOutputs.size()-currentNumKnownMaps) + - " new map-outputs & " + - (obsoleteMapIds.size()-currentNumObsoleteMapIds) + - " obsolete map-outputs from tasktracker and " + - retryFetches.size() + " map-outputs from previous failures" - ); + int numNewOutputs = knownOutputs.size()-currentNumKnownMaps; + if (numNewOutputs > 0 || logNow) { + LOG.info(reduceTask.getTaskID() + ": " + + "Got " + numNewOutputs + + " new map-outputs & number of known map outputs is " + + knownOutputs.size()); + } + + int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds; + if (numNewObsoleteMaps > 0) { + LOG.info(reduceTask.getTaskID() + ": " + + "Got " + numNewObsoleteMaps + + " obsolete map-outputs from tasktracker "); + } + + if (retryFetches.size() > 0) { + LOG.info(reduceTask.getTaskID() + ": " + + "Got " + retryFetches.size() + + " map-outputs from previous failures"); + } // clear the "failed" fetches hashmap retryFetches.clear(); } @@ -1086,10 +1114,7 @@ // now walk through the cache and schedule what we can int numKnown = knownOutputs.size(), numScheduled = 0; - int numSlow = 0, numDups = 0; - - LOG.info(reduceTask.getTaskID() + " Got " + numKnown + - " known map output location(s); scheduling..."); + int numDups = 0; synchronized (scheduledCopies) { // Randomize the map output locations to prevent @@ -1098,7 +1123,7 @@ Iterator locIt = knownOutputs.iterator(); - currentTime = System.currentTimeMillis(); + while (locIt.hasNext()) { MapOutputLocation loc = locIt.next(); @@ -1112,8 +1137,12 @@ Long penaltyEnd = penaltyBox.get(loc.getHost()); boolean penalized = false, duplicate = false; - if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) { - penalized = true; numSlow++; + if (penaltyEnd != null) { + if (currentTime < penaltyEnd.longValue()) { + penalized = true; + } else { + penaltyBox.remove(loc.getHost()); + } } if (uniqueHosts.contains(loc.getHost())) { duplicate = true; numDups++; @@ -1128,9 +1157,18 @@ } scheduledCopies.notifyAll(); } - LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled + - " of " + numKnown + " known outputs (" + numSlow + + if (numScheduled > 0 || logNow) { + LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled + + " of " + numKnown + " known outputs (" + penaltyBox.size() + " slow hosts and " + numDups + " dup hosts)"); + } + if (penaltyBox.size() > 0 && logNow) { + LOG.info("Penalized(slow) Hosts: "); + for (String host : penaltyBox.keySet()) { + LOG.info(host + " Will be considered after: " + + ((penaltyBox.get(host) - currentTime)/1000) + " seconds."); + } + } // Check if a on-disk merge can be done. This will help if there // are no copies to be fetched but sufficient copies to be merged.