hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r731530 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Date Mon, 05 Jan 2009 11:35:25 GMT
Author: yhemanth
Date: Mon Jan  5 03:35:22 2009
New Revision: 731530

URL: http://svn.apache.org/viewvc?rev=731530&view=rev
Log:
HADOOP-4979. Fix capacity scheduler to block cluster for failed high RAM requirements across
task types. Contributed by Vivek Ratan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan  5 03:35:22 2009
@@ -527,6 +527,9 @@
 
     HADOOP-4949. Fix native compilation. (Chris Douglas via rangadi)
 
+    HADOOP-4979. Fix capacity scheduler to block cluster for failed high
+    RAM requirements across task types. (Vivek Ratan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Mon Jan  5 03:35:22 2009
@@ -843,7 +843,8 @@
           TaskLookUpStatus.NO_TASK_IN_JOB, msg);
     }
 
-    private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException
{
+    // don't return null
+    private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException
{
       Task t = null;
 
       /* 
@@ -867,7 +868,8 @@
           // Queues are sorted so that ones without capacities
           // come towards the end. Hence, we can simply return
           // from here without considering any further queues.
-          return null;
+          return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+              null);
         }
 
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -878,10 +880,9 @@
         }
 
         if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
-          t = tlr.getTask();
           // we have a task. Update reclaimed resource info
           updateReclaimedResources(qsi);
-          return Collections.singletonList(t);
+          return tlr;
         }
         
         if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
@@ -891,13 +892,14 @@
             LOG.warn(msg);
             LOG.warn("Returning nothing to the Tasktracker "
                 + taskTracker.trackerName);
-            return null;
+            return tlr;
           }
         }
       }
 
       // nothing to give
-      return null;
+      return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+          null);
     }
     
     private void printQSIs() {
@@ -1307,7 +1309,7 @@
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
     
-    List<Task> tasks = null;
+    TaskLookupResult tlr;
     /* 
      * If TT has Map and Reduce slot free, we need to figure out whether to
      * give it a Map or Reduce task.
@@ -1324,22 +1326,54 @@
     int currentMapTasks = taskTracker.countMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int currentReduceTasks = taskTracker.countReduceTasks();
+    
     if ((maxReduceTasks - currentReduceTasks) > 
     (maxMapTasks - currentMapTasks)) {
-      tasks = reduceScheduler.assignTasks(taskTracker);
+      // get a reduce task first
+      tlr = reduceScheduler.assignTasks(taskTracker);
+      if (TaskLookUpStatus.TASK_FOUND == 
+        tlr.getLookUpStatus()) {
+        // found a task; return
+        return Collections.singletonList(tlr.getTask());
+      }
+      else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS == 
+        tlr.getLookUpStatus()) {
+        // return no task
+        return null;
+      }
       // if we didn't get any, look at map tasks, if TT has space
-      if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
-        tasks = mapScheduler.assignTasks(taskTracker);
+      else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE == 
+        tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+        tlr = mapScheduler.assignTasks(taskTracker);
+        if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+          return Collections.singletonList(tlr.getTask());
+        }
       }
     }
     else {
-      tasks = mapScheduler.assignTasks(taskTracker);
-      // if we didn't get any, look at red tasks, if TT has space
-      if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
-        tasks = reduceScheduler.assignTasks(taskTracker);
+      // get a map task first
+      tlr = mapScheduler.assignTasks(taskTracker);
+      if (TaskLookUpStatus.TASK_FOUND == 
+        tlr.getLookUpStatus()) {
+        // found a task; return
+        return Collections.singletonList(tlr.getTask());
+      }
+      else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS == 
+        tlr.getLookUpStatus()) {
+        // return no task
+        return null;
+      }
+      // if we didn't get any, look at reduce tasks, if TT has space
+      else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE == 
+        tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+        tlr = reduceScheduler.assignTasks(taskTracker);
+        if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+          return Collections.singletonList(tlr.getTask());
+        }
       }
     }
-    return tasks;
+
+    return null;
   }
 
   /**

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Mon Jan  5 03:35:22 2009
@@ -1826,6 +1826,77 @@
   }
 
   /**
+   * Test HADOOP-4979. 
+   * Bug fix for making sure we always return null to TT if there is a 
+   * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
+   * or vice-versa.
+   * @throws IOException
+   */
+  public void testHighMemoryBlocking()
+      throws IOException {
+
+    // 2 map and 1 reduce slots
+    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+    // Normal job on this TT would be 1GB vmem, 0.5GB pmem
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // We need a situation where the scheduler needs to run a map task, 
+    // but the available one has a high-mem requirement. There should
+    // be another job whose maps or reduces can run, but they shouldn't 
+    // be scheduled.
+    
+    LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+        + "2 map tasks");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+        + "2 map/red tasks");
+    jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
+    jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    
+    // first, a map from j1 will run
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // at this point, the scheduler tries to schedule another map from j1. 
+    // there isn't enough space. There is space to run the second job's
+    // map or reduce task, but they shouldn't be scheduled
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+  
+  /**
    * test invalid highMemoryJobs
    * @throws IOException
    */



Mime
View raw message