hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r943372 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Date Wed, 12 May 2010 05:10:45 GMT
Author: dhruba
Date: Wed May 12 05:10:44 2010
New Revision: 943372

URL: http://svn.apache.org/viewvc?rev=943372&view=rev
Log:
MAPREDUCE-1761. FairScheduler allows separate configuration of node
and rack locality wait time (Scott Chen via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=943372&r1=943371&r2=943372&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed May 12 05:10:44 2010
@@ -17,6 +17,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1680. Add a metric recording JobTracker heartbeats processed.
     (Dick King via cdouglas)
 
+    MAPREDUCE-1761. FairScheduler allows separate configuration of node
+    and rack locality wait time (Scott Chen via dhruba)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=943372&r1=943371&r2=943372&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Wed May 12 05:10:44 2010
@@ -73,7 +73,8 @@ public class FairScheduler extends TaskS
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
   protected int mapAssignCap = -1;    // Max maps to launch per heartbeat
   protected int reduceAssignCap = -1; // Max reduces to launch per heartbeat
-  protected long localityDelay;       // Time to wait for node and rack locality
+  protected long nodeLocalityDelay;   // Time to wait for node locality
+  protected long rackLocalityDelay;   // Time to wait for rack locality
   protected boolean autoComputeLocalityDelay = false; // Compute locality delay
                                                       // from heartbeat interval
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
@@ -184,10 +185,16 @@ public class FairScheduler extends TaskS
           "mapred.fairscheduler.preemption", false);
       onlyLogPreemption = conf.getBoolean(
           "mapred.fairscheduler.preemption.only.log", false);
-      localityDelay = conf.getLong(
+      long defaultDelay = conf.getLong(
           "mapred.fairscheduler.locality.delay", -1);
-      if (localityDelay == -1)
+      nodeLocalityDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay.node", defaultDelay);
+      rackLocalityDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay.rack", defaultDelay);
+      if (defaultDelay == -1 && 
+          (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
         autoComputeLocalityDelay = true; // Compute from heartbeat interval
+      }
       initialized = true;
       running = true;
       lastUpdateTime = clock.getTime();
@@ -490,9 +497,10 @@ public class FairScheduler extends TaskS
    * If the job has no locality information (e.g. it does not use HDFS), this 
    * method returns LocalityLevel.ANY, allowing tasks at any level.
    * Otherwise, the job can only launch tasks at its current locality level
-   * or lower, unless it has waited at least localityDelay milliseconds
-   * (in which case it can go one level beyond) or 2 * localityDelay millis
-   * (in which case it can go to any level).
+   * or lower, unless it has waited at least nodeLocalityDelay or
+   * rackLocalityDelay milliseconds depends on the current level. If it
+   * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
+   * it can go to any level.
    */
   protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
       long currentTime) {
@@ -519,14 +527,15 @@ public class FairScheduler extends TaskS
     // In the common case, compute locality level based on time waited
     switch(info.lastMapLocalityLevel) {
     case NODE: // Last task launched was node-local
-      if (info.timeWaitedForLocalMap >= 2 * localityDelay)
+      if (info.timeWaitedForLocalMap >=
+          nodeLocalityDelay + rackLocalityDelay)
         return LocalityLevel.ANY;
-      else if (info.timeWaitedForLocalMap >= localityDelay)
+      else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
         return LocalityLevel.RACK;
       else
         return LocalityLevel.NODE;
     case RACK: // Last task launched was rack-local
-      if (info.timeWaitedForLocalMap >= localityDelay)
+      if (info.timeWaitedForLocalMap >= rackLocalityDelay)
         return LocalityLevel.ANY;
       else
         return LocalityLevel.RACK;
@@ -549,8 +558,9 @@ public class FairScheduler extends TaskS
     // This will also lock the JT, so do it outside of a fair scheduler lock.
     if (autoComputeLocalityDelay) {
       JobTracker jobTracker = (JobTracker) taskTrackerManager;
-      localityDelay = Math.min(MAX_AUTOCOMPUTED_LOCALITY_DELAY,
+      nodeLocalityDelay = Math.min(MAX_AUTOCOMPUTED_LOCALITY_DELAY,
           (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
+      rackLocalityDelay = nodeLocalityDelay;
     }
     
     // Got clusterStatus hence acquiring scheduler lock now.

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=943372&r1=943371&r2=943372&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Wed May 12 05:10:44 2010
@@ -507,7 +507,8 @@ public class TestFairScheduler extends T
     conf.setBoolean("mapred.fairscheduler.assignmultiple", assignMultiple);
     // Manually set locality delay because we aren't using a JobTracker so
     // we can't auto-compute it from the heartbeat interval.
-    conf.setLong("mapred.fairscheduler.locality.delay", 10000);
+    conf.setLong("mapred.fairscheduler.locality.delay.node", 5000);
+    conf.setLong("mapred.fairscheduler.locality.delay.rack", 10000);
     taskTrackerManager = new FakeTaskTrackerManager(numRacks, numNodesPerRack);
     clock = new FakeClock();
     scheduler = new FairScheduler(clock, true);
@@ -2221,8 +2222,8 @@ public class TestFairScheduler extends T
     checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                            "attempt_test_0002_m_000005_0 on tt3");
     
-    // Advance time by 11 seconds to put us past the 10-second locality delay
-    advanceTime(11000);
+    // Advance time by 6 seconds to put us past the 5-second node locality delay
+    advanceTime(6000);
     
     // Finish some tasks on each node
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
@@ -2297,8 +2298,8 @@ public class TestFairScheduler extends T
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt3")));
     
-    // Advance time by 11 seconds to put us past the 10-sec node locality delay
-    advanceTime(11000);
+    // Advance time by 6 seconds to put us past the 5-sec node locality delay
+    advanceTime(6000);
 
     // Check that nothing is assigned on trackers 1-2; the job would assign
     // a task on tracker 3 (rack1.node2) so we skip that one 
@@ -2315,7 +2316,7 @@ public class TestFairScheduler extends T
 
     // Check that delay scheduling info is properly set
     assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
-    assertEquals(info1.timeWaitedForLocalMap, 11200);
+    assertEquals(info1.timeWaitedForLocalMap, 6200);
     assertEquals(info1.skippedAtLastHeartbeat, true);
     
     // Advance time by 11 seconds to put us past the 10-sec rack locality delay



Mime
View raw message