hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r733895 - in /hadoop/core/trunk/src/contrib/fairscheduler/src: java/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Date Mon, 12 Jan 2009 20:42:34 GMT
Author: matei
Date: Mon Jan 12 12:42:29 2009
New Revision: 733895

URL: http://svn.apache.org/viewvc?rev=733895&view=rev
Log:
HADOOP-4943. Fair share scheduler does not utilize all slots if the task trackers are configured
heterogeneously.

Contributed by Zheng Shao.


Modified:
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
Mon Jan 12 12:42:29 2009
@@ -31,24 +31,22 @@
    * out uniformly across the nodes rather than being clumped up on whichever
    * machines sent out heartbeats earliest.
    */
-  int getCap(TaskTrackerStatus tracker,
-      int totalRunnableTasks, int localMaxTasks) {
-    int numTaskTrackers = taskTrackerManager.taskTrackers().size();
-    return Math.min(localMaxTasks,
-        (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+  int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
+    double load = ((double)totalRunnableTasks) / totalSlots;
+    return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
   }
 
   @Override
   public boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps) {
-    return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
-        tracker.getMaxMapTasks());
+      int totalRunnableMaps, int totalMapSlots) {
+    return tracker.countMapTasks() < getCap(totalRunnableMaps,
+        tracker.getMaxMapTasks(), totalMapSlots);
   }
 
   @Override
   public boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces) {
-    return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
-        tracker.getMaxReduceTasks());
+      int totalRunnableReduces, int totalReduceSlots) {
+    return tracker.countReduceTasks() < getCap(totalRunnableReduces,
+        tracker.getMaxReduceTasks(), totalReduceSlots);
   }
 }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Mon Jan 12 12:42:29 2009
@@ -232,14 +232,20 @@
       runnableMaps += runnableTasks(job, TaskType.MAP);
       runnableReduces += runnableTasks(job, TaskType.REDUCE);
     }
+
+    // Compute total map/reduce slots
+    // In the future we can precompute this if the Scheduler becomes a 
+    // listener of tracker join/leave events.
+    int totalMapSlots = getTotalSlots(TaskType.MAP);
+    int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
     
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
     TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
     for (TaskType taskType: types) {
       boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(tracker, runnableMaps) :
-          loadMgr.canAssignReduce(tracker, runnableReduces);
+          loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
+          loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
       if (canAssign) {
         // Figure out the jobs that need this type of task
         List<JobInProgress> candidates = new ArrayList<JobInProgress>();

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
Mon Jan 12 12:42:29 2009
@@ -63,17 +63,19 @@
    * Can a given {@link TaskTracker} run another map task?
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableMaps Set of running jobs in the cluster
+   * @param totalMapSlots The total number of map slots in the cluster
    * @return true if another map can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps);
+      int totalRunnableMaps, int totalMapSlots);
 
   /**
    * Can a given {@link TaskTracker} run another reduce task?
    * @param tracker The machine we wish to run a new map on
-   * @param totalReducesNeeded Set of running jobs in the cluster
+   * @param totalRunnableReduces Set of running jobs in the cluster
+   * @param totalReduceSlots The total number of reduce slots in the cluster
    * @return true if another reduce can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces);
+      int totalRunnableReduces, int totalReduceSlots);
 }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Mon Jan 12 12:42:29 2009
@@ -1150,6 +1150,24 @@
     assertEquals(0.28,  info4.mapFairShare, 0.01);
     assertEquals(0.28,  info4.reduceFairShare, 0.01);
   }
+
+  /**
+   * Tests that max-running-tasks per node are set by assigning load
+   * equally accross the cluster in CapBasedLoadManager.
+   */
+  public void testCapBasedLoadManager() {
+    CapBasedLoadManager loadMgr = new CapBasedLoadManager();
+    // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots
+    // Desired behavior: return ceil(nodeCap * min(1, runnableTasks/totalSlots))
+    assertEquals(1, loadMgr.getCap(1, 1, 100));
+    assertEquals(1, loadMgr.getCap(1, 2, 100));
+    assertEquals(1, loadMgr.getCap(1, 10, 100));
+    assertEquals(1, loadMgr.getCap(200, 1, 100));
+    assertEquals(1, loadMgr.getCap(1, 5, 100));
+    assertEquals(3, loadMgr.getCap(50, 5, 100));
+    assertEquals(5, loadMgr.getCap(100, 5, 100));
+    assertEquals(5, loadMgr.getCap(200, 5, 100));
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);



Mime
View raw message