Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 94335 invoked from network); 4 Mar 2011 03:30:25 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:30:25 -0000 Received: (qmail 19844 invoked by uid 500); 4 Mar 2011 03:30:24 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 19794 invoked by uid 500); 4 Mar 2011 03:30:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 19787 invoked by uid 99); 4 Mar 2011 03:30:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:30:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:30:22 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E73D02388C16; Fri, 4 Mar 2011 03:30:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077009 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 03:30:02 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304033002.E73D02388C16@eris.apache.org> Author: omalley Date: Fri Mar 4 03:30:02 2011 New Revision: 1077009 URL: http://svn.apache.org/viewvc?rev=1077009&view=rev Log: commit 1f2a5df1b2a40a3aebca212904165f3fd6d9f85c Author: Arun C Murthy Date: Mon Sep 28 14:28:46 2009 -0700 MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band heartbeat on task-completion for better job-latency. Contributed by Arun C. Murthy Configuration changes: add mapreduce.tasktracker.outofband.heartbeat from: https://issues.apache.org/jira/secure/attachment/12420718/MAPREDUCE-270_yhadoop20.patch +++ b/YAHOO-CHANGES.txt +60. MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band + heartbeat on task-completion for better job-latency. Contributed by + Arun C. Murthy + Configuration changes: + add mapreduce.tasktracker.outofband.heartbeat + Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:30:02 2011 @@ -223,6 +223,14 @@ + mapreduce.tasktracker.outofband.heartbeat + false + Expert: Set this to true to let the tasktracker send an + out-of-band heartbeat on task-completion for better latency. + + + + mapred.jobtracker.restart.recover false "true" to enable (job) recovery upon restart, @@ -611,7 +619,7 @@ mapred.heartbeats.in.second 100 Expert: Approximate number of heart-beats that could arrive - JobTracker in a second. Assuming each RPC can be processed + at JobTracker in a second. Assuming each RPC can be processed in 10msec, the default value is made 100 RPCs in a second. Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:30:02 2011 @@ -133,9 +133,21 @@ public class JobTracker implements MRCon // The maximum number of blacklists for a tracker after which the // tracker could be blacklisted across all jobs private int MAX_BLACKLISTS_PER_TRACKER = 4; + // Approximate number of heartbeats that could arrive JobTracker // in a second - private int NUM_HEARTBEATS_IN_SECOND = 100; + static final String JT_HEARTBEATS_IN_SECOND = "mapred.heartbeats.in.second"; + private int NUM_HEARTBEATS_IN_SECOND; + private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100; + private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1; + + // Scaling factor for heartbeats, used for testing only + static final String JT_HEARTBEATS_SCALING_FACTOR = + "mapreduce.jobtracker.heartbeats.scaling.factor"; + private float HEARTBEATS_SCALING_FACTOR; + private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f; + private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f; + public static enum State { INITIALIZING, RUNNING } State state = State.INITIALIZING; private static final int FS_ACCESS_RETRY_PERIOD = 10000; @@ -1908,8 +1920,19 @@ public class JobTracker implements MRCon MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100); MAX_BLACKLISTS_PER_TRACKER = conf.getInt("mapred.max.tracker.blacklists", 4); + NUM_HEARTBEATS_IN_SECOND = - conf.getInt("mapred.heartbeats.in.second", 100); + conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND); + if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) { + NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND; + } + + HEARTBEATS_SCALING_FACTOR = + conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, + DEFAULT_HEARTBEATS_SCALING_FACTOR); + if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) { + HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR; + } //This configuration is there solely for tuning purposes and //once this feature has been tested in real clusters and an appropriate @@ -2979,15 +3002,16 @@ public class JobTracker implements MRCon /** * Calculates next heartbeat interval using cluster size. - * Heartbeat interval is incremented 1second for every 50 nodes. + * Heartbeat interval is incremented by 1 second for every 100 nodes by default. * @return next heartbeat interval. */ public int getNextHeartbeatInterval() { // get the no of task trackers int clusterSize = getClusterStatus().getTaskTrackers(); int heartbeatInterval = Math.max( - (int)(1000 * Math.ceil((double)clusterSize / - NUM_HEARTBEATS_IN_SECOND)), + (int)(1000 * HEARTBEATS_SCALING_FACTOR * + Math.ceil((double)clusterSize / + NUM_HEARTBEATS_IN_SECOND)), HEARTBEAT_INTERVAL_MIN) ; return heartbeatInterval; } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:30:02 2011 @@ -538,7 +538,12 @@ abstract class TaskRunner extends Thread }catch(IOException ie){ LOG.warn("Error releasing caches : Cache files might not have been cleaned up"); } - tip.reportTaskFinished(); + + // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with + // *false* since the task has either + // a) SUCCEEDED - which means commit has been done + // b) FAILED - which means we do not need to commit + tip.reportTaskFinished(false); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:30:02 2011 @@ -205,6 +205,16 @@ public class TaskTracker private int maxMapSlots; private int maxReduceSlots; private int failures; + + // Performance-related config knob to send an out-of-band heartbeat + // on task completion + static final String TT_OUTOFBAND_HEARBEAT = + "mapreduce.tasktracker.outofband.heartbeat"; + private volatile boolean oobHeartbeatOnTaskCompletion; + + // Track number of completed tasks to send an out-of-band heartbeat + private IntWritable finishedCount = new IntWritable(0); + private MapEventsFetcherThread mapEventsFetcher; int workerThreads; private CleanupQueue directoryCleanupThread; @@ -565,6 +575,9 @@ public class TaskTracker if (shouldStartHealthMonitor(this.fConf)) { startHealthMonitor(this.fConf); } + + oobHeartbeatOnTaskCompletion = + fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false); } public static Class getInstrumentationClass( @@ -1041,8 +1054,14 @@ public class TaskTracker long waitTime = heartbeatInterval - (now - lastHeartbeat); if (waitTime > 0) { - // sleeps for the wait time - Thread.sleep(waitTime); + // sleeps for the wait time or + // until there are empty slots to schedule tasks + synchronized (finishedCount) { + if (finishedCount.get() == 0) { + finishedCount.wait(waitTime); + } + finishedCount.set(0); + } } // If the TaskTracker is just starting up: @@ -1758,6 +1777,19 @@ public class TaskTracker } } + /** + * Notify the tasktracker to send an out-of-band heartbeat. + */ + private void notifyTTAboutTaskCompletion() { + if (oobHeartbeatOnTaskCompletion) { + synchronized (finishedCount) { + int value = finishedCount.get(); + finishedCount.set(value+1); + finishedCount.notify(); + } + } + } + /** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only @@ -2100,9 +2132,21 @@ public class TaskTracker return wasKilled; } - void reportTaskFinished() { - taskFinished(); - releaseSlot(); + /** + * A task is reporting in as 'done'. + * + * We need to notify the tasktracker to send an out-of-band heartbeat. + * If isn't commitPending, we need to finalize the task + * and release the slot it's occupied. + * + * @param commitPending is the task-commit pending? + */ + void reportTaskFinished(boolean commitPending) { + if (!commitPending) { + taskFinished(); + releaseSlot(); + } + notifyTTAboutTaskCompletion(); } /* State changes: @@ -2403,6 +2447,7 @@ public class TaskTracker } removeFromMemoryManager(task.getTaskID()); releaseSlot(); + notifyTTAboutTaskCompletion(); } private synchronized void releaseSlot() { @@ -2714,9 +2759,7 @@ public class TaskTracker tip = tasks.get(taskid); } if (tip != null) { - if (!commitPending) { - tip.reportTaskFinished(); - } + tip.reportTaskFinished(commitPending); } else { LOG.warn("Unknown child task finished: "+taskid+". Ignored."); } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java Fri Mar 4 03:30:02 2011 @@ -21,6 +21,9 @@ import java.io.IOException; import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobConf; public class TestMapredHeartbeat extends TestCase { @@ -42,7 +45,7 @@ public class TestMapredHeartbeat extends // test configured heartbeat interval taskTrackers = 5; - conf.setInt("mapred.heartbeats.in.second", 1); + conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 1); mr = new MiniMRCluster(taskTrackers, "file:///", 3, null, null, conf); jc = new JobClient(mr.createJobConf()); @@ -55,7 +58,7 @@ public class TestMapredHeartbeat extends // test configured heartbeat interval is capped with min value taskTrackers = 5; - conf.setInt("mapred.heartbeats.in.second", 10); + conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 10); mr = new MiniMRCluster(taskTrackers, "file:///", 3, null, null, conf); jc = new JobClient(mr.createJobConf()); @@ -68,6 +71,37 @@ public class TestMapredHeartbeat extends if (mr != null) { mr.shutdown(); } } } + + public void testOutOfBandHeartbeats() throws Exception { + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 4, true, null); + + int taskTrackers = 1; + JobConf jobConf = new JobConf(); + jobConf.setFloat(JobTracker.JT_HEARTBEATS_SCALING_FACTOR, 30.0f); + jobConf.setBoolean(TaskTracker.TT_OUTOFBAND_HEARBEAT, true); + mr = new MiniMRCluster(taskTrackers, + dfs.getFileSystem().getUri().toString(), 3, + null, null, jobConf); + long start = System.currentTimeMillis(); + TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw")); + long end = System.currentTimeMillis(); + + final int expectedRuntimeSecs = 120; + final int runTimeSecs = (int)((end-start) / 1000); + System.err.println("Runtime is " + runTimeSecs); + assertEquals("Actual runtime " + runTimeSecs + "s not less than expected " + + "runtime of " + expectedRuntimeSecs + "s!", + true, (runTimeSecs <= 120)); + } finally { + if (mr != null) { mr.shutdown(); } + if (dfs != null) { dfs.shutdown(); } + } + } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1077009&r1=1077008&r2=1077009&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Fri Mar 4 03:30:02 2011 @@ -71,7 +71,7 @@ public class TestMiniMRDFSSort extends T return setup; } - private static void runRandomWriter(JobConf job, Path sortInput) + public static void runRandomWriter(JobConf job, Path sortInput) throws Exception { // Scale down the default settings for RandomWriter for the test-case // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP