hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r811569 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
Date Fri, 04 Sep 2009 23:34:20 GMT
Author: dhruba
Date: Fri Sep  4 23:34:20 2009
New Revision: 811569

URL: http://svn.apache.org/viewvc?rev=811569&view=rev
Log:
MAPREDUCE-936. Allow a load difference for fairshare scheduler.
(Zheng Shao via dhruba)


Added:
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=811569&r1=811568&r2=811569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep  4 23:34:20 2009
@@ -283,6 +283,9 @@
     MAPREDUCE-318. Modularizes the shuffle code. (Jothi Padmanabhan and 
     Arun Murthy via ddas)
 
+    MAPREDUCE-936. Allow a load difference for fairshare scheduler.
+    (Zheng Shao via dhruba)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=811569&r1=811568&r2=811569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
Fri Sep  4 23:34:20 2009
@@ -18,12 +18,22 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
  * tasks evenly across nodes up to their per-node maximum, using the default
  * load management algorithm in Hadoop.
  */
 public class CapBasedLoadManager extends LoadManager {
+  
+  float maxDiff = 0.0f;
+  
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    maxDiff = conf.getFloat("mapred.fairscheduler.load.max.diff", 0.0f);
+  }
+  
   /**
    * Determine how many tasks of a given type we want to run on a TaskTracker. 
    * This cap is chosen based on how many tasks of that type are outstanding in
@@ -32,7 +42,7 @@
    * machines sent out heartbeats earliest.
    */
   int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
-    double load = ((double)totalRunnableTasks) / totalSlots;
+    double load = maxDiff + ((double)totalRunnableTasks) / totalSlots;
     return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
   }
 

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java?rev=811569&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
Fri Sep  4 23:34:20 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the canAssignMap and canAssignReduce methods in 
+ * CapBasedLoadManager.
+ */
+public class TestCapBasedLoadManager extends TestCase {
+  
+  /**
+   * Returns a running MapTaskStatus.
+   */
+  private TaskStatus getRunningMapTaskStatus() {
+    TaskStatus ts = new MapTaskStatus();
+    ts.setRunState(State.RUNNING);
+    return ts;
+  }
+
+  /**
+   * Returns a running ReduceTaskStatus.
+   */
+  private TaskStatus getRunningReduceTaskStatus() {
+    TaskStatus ts = new ReduceTaskStatus();
+    ts.setRunState(State.RUNNING);
+    return ts;
+  }
+  
+  /**
+   * Returns a TaskTrackerStatus with the specified statistics. 
+   * @param mapCap        The capacity of map tasks 
+   * @param reduceCap     The capacity of reduce tasks
+   * @param runningMap    The number of running map tasks
+   * @param runningReduce The number of running reduce tasks
+   */
+  private TaskTrackerStatus getTaskTrackerStatus(int mapCap, int reduceCap, 
+      int runningMap, int runningReduce) {
+    List<TaskStatus> ts = new ArrayList<TaskStatus>();
+    for (int i = 0; i < runningMap; i++) {
+      ts.add(getRunningMapTaskStatus());
+    }
+    for (int i = 0; i < runningReduce; i++) {
+      ts.add(getRunningReduceTaskStatus());
+    }
+    TaskTrackerStatus tracker = new TaskTrackerStatus("tracker", 
+        "tracker_host", 1234, ts, 0, mapCap, reduceCap);
+    return tracker;
+  }
+
+  /**
+   * A single test of canAssignMap.
+   */
+  private void oneTestCanAssignMap(float maxDiff, int mapCap, int runningMap,
+      int totalMapSlots, int totalRunnableMap, boolean expected) {
+    
+    CapBasedLoadManager manager = new CapBasedLoadManager();
+    Configuration conf = new Configuration();
+    conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+    manager.setConf(conf);
+    
+    TaskTrackerStatus ts = getTaskTrackerStatus(mapCap, 1, runningMap, 1);
+    
+    assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableMap=" 
+        + totalRunnableMap + " and totalMapSlots=" + totalMapSlots
+        + ", a tracker with runningMap=" + runningMap + " and mapCap="
+        + mapCap + " should " + (expected ? "" : "not ")
+        + "be able to take more Maps.",
+        expected,
+        manager.canAssignMap(ts, totalRunnableMap, totalMapSlots)
+        );
+  }
+  
+  
+  /** 
+   * Test canAssignMap method.
+   */
+  public void testCanAssignMap() {
+    oneTestCanAssignMap(0.0f, 5, 0, 50, 1, true);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 10, false);
+    oneTestCanAssignMap(0.2f, 5, 1, 50, 10, true);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 11, true);
+    oneTestCanAssignMap(0.0f, 5, 2, 50, 11, false);
+    oneTestCanAssignMap(0.3f, 5, 2, 50, 6, true);
+    oneTestCanAssignMap(1.0f, 5, 5, 50, 50, false);
+  }
+  
+  
+  /**
+   * A single test of canAssignReduce.
+   */
+  private void oneTestCanAssignReduce(float maxDiff, int ReduceCap,
+      int runningReduce, int totalReduceSlots, int totalRunnableReduce,
+      boolean expected) {
+    
+    CapBasedLoadManager manager = new CapBasedLoadManager();
+    Configuration conf = new Configuration();
+    conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+    manager.setConf(conf);
+    
+    TaskTrackerStatus ts = getTaskTrackerStatus(1, ReduceCap, 1,
+        runningReduce);
+    
+    assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableReduce=" 
+        + totalRunnableReduce + " and totalReduceSlots=" + totalReduceSlots
+        + ", a tracker with runningReduce=" + runningReduce
+        + " and ReduceCap=" + ReduceCap + " should "
+        + (expected ? "" : "not ") + "be able to take more Reduces.",
+        expected,
+        manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots)
+        );
+  }
+    
+  /** 
+   * Test canAssignReduce method.
+   */
+  public void testCanAssignReduce() {
+    oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, true);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, false);
+    oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, true);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, true);
+    oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, false);
+    oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, true);
+    oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, false);
+  }
+  
+}



Mime
View raw message