Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 90651 invoked from network); 4 Mar 2011 05:24:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 05:24:40 -0000 Received: (qmail 50083 invoked by uid 500); 4 Mar 2011 05:24:40 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 50042 invoked by uid 500); 4 Mar 2011 05:24:39 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 49972 invoked by uid 99); 4 Mar 2011 05:24:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 05:24:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 05:24:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 56C902388A32; Fri, 4 Mar 2011 05:24:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077806 - /hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Date: Fri, 04 Mar 2011 05:24:14 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304052414.56C902388A32@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 05:24:13 2011 New Revision: 1077806 URL: http://svn.apache.org/viewvc?rev=1077806&view=rev Log: commit d72894e09d7e58bc2cd3766c500a66e7aa0c3d0f Author: Arun C Murthy Date: Tue Mar 1 16:36:55 2011 -0800 Tuning out-of-band heartbeats, introduced a new config: mapreduce.tasktracker.outofband.heartbeat.damper. Modified: hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Modified: hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077806&r1=1077805&r2=1077806&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 05:24:13 2011 @@ -45,6 +45,7 @@ import java.util.Vector; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import javax.crypto.SecretKey; @@ -272,9 +273,13 @@ public class TaskTracker implements MRCo static final String TT_OUTOFBAND_HEARBEAT = "mapreduce.tasktracker.outofband.heartbeat"; private volatile boolean oobHeartbeatOnTaskCompletion; + static final String TT_OUTOFBAND_HEARTBEAT_DAMPER = + "mapreduce.tasktracker.outofband.heartbeat.damper"; + static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000; + private volatile int oobHeartbeatDamper; // Track number of completed tasks to send an out-of-band heartbeat - private IntWritable finishedCount = new IntWritable(0); + private AtomicInteger finishedCount = new AtomicInteger(0); private MapEventsFetcherThread mapEventsFetcher; final int workerThreads; @@ -298,6 +303,7 @@ public class TaskTracker implements MRCo * the minimum interval between jobtracker polls */ private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN; + /** * Number of maptask completion events locations to poll for at one time */ @@ -735,6 +741,10 @@ public class TaskTracker implements MRCo oobHeartbeatOnTaskCompletion = fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false); + + oobHeartbeatDamper = + fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, + DEFAULT_OOB_HEARTBEAT_DAMPER); } private void createInstrumentation() { @@ -1401,25 +1411,39 @@ public class TaskTracker implements MRCo return recentMapEvents; } + private long getHeartbeatInterval(int numFinishedTasks) { + return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1)); + } + /** * Main service loop. Will stay in this loop forever. */ State offerService() throws Exception { - long lastHeartbeat = 0; + long lastHeartbeat = System.currentTimeMillis(); while (running && !shuttingDown) { try { long now = System.currentTimeMillis(); - - long waitTime = heartbeatInterval - (now - lastHeartbeat); - if (waitTime > 0) { + + // accelerate to account for multiple finished tasks up-front + long remaining = + (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; + while (remaining > 0) { // sleeps for the wait time or - // until there are empty slots to schedule tasks + // until there are *enough* empty slots to schedule tasks synchronized (finishedCount) { - if (finishedCount.get() == 0) { - finishedCount.wait(waitTime); + finishedCount.wait(remaining); + + // Recompute + now = System.currentTimeMillis(); + remaining = + (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; + + if (remaining <= 0) { + // Reset count + finishedCount.set(0); + break; } - finishedCount.set(0); } } @@ -2187,8 +2211,7 @@ public class TaskTracker implements MRCo private void notifyTTAboutTaskCompletion() { if (oobHeartbeatOnTaskCompletion) { synchronized (finishedCount) { - int value = finishedCount.get(); - finishedCount.set(value+1); + finishedCount.incrementAndGet(); finishedCount.notify(); } }