hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077538 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 04:26:09 GMT
Author: omalley
Date: Fri Mar  4 04:26:09 2011
New Revision: 1077538

URL: http://svn.apache.org/viewvc?rev=1077538&view=rev
Log:
commit cb8abfee874e281591ecbfb97619dd018125f466
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Mon Jul 12 02:05:36 2010 -0700

    MAPREDUCE-517. Enhance the CapacityScheduler to assign multiple tasks per-heartbeat.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-517. Enhance the CapacityScheduler to assign multiple tasks
    +    per-heartbeat. (acmurthy)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Mar  4 04:26:09 2011
@@ -342,28 +342,32 @@ class CapacityTaskScheduler extends Task
   private static class TaskLookupResult {
 
     static enum LookUpStatus {
-      TASK_FOUND,
+      LOCAL_TASK_FOUND,
       NO_TASK_FOUND,
       TASK_FAILING_MEMORY_REQUIREMENT,
+      OFF_SWITCH_TASK_FOUND
     }
     // constant TaskLookupResult objects. Should not be accessed directly.
     private static final TaskLookupResult NoTaskLookupResult = 
-      new TaskLookupResult(null, TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
+      new TaskLookupResult(null, null, 
+          TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
     private static final TaskLookupResult MemFailedLookupResult = 
-      new TaskLookupResult(null, 
+      new TaskLookupResult(null, null,
           TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT);
 
     private LookUpStatus lookUpStatus;
     private Task task;
-
+    private JobInProgress job;
+    
     // should not call this constructor directly. use static factory methods.
-    private TaskLookupResult(Task t, LookUpStatus lUStatus) {
+    private TaskLookupResult(Task t, JobInProgress job, LookUpStatus lUStatus) {
       this.task = t;
+      this.job = job;
       this.lookUpStatus = lUStatus;
     }
     
-    static TaskLookupResult getTaskFoundResult(Task t) {
-      return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
+    static TaskLookupResult getTaskFoundResult(Task t, JobInProgress job) {
+      return new TaskLookupResult(t, job, LookUpStatus.LOCAL_TASK_FOUND);
     }
     static TaskLookupResult getNoTaskFoundResult() {
       return NoTaskLookupResult;
@@ -371,12 +375,19 @@ class CapacityTaskScheduler extends Task
     static TaskLookupResult getMemFailedResult() {
       return MemFailedLookupResult;
     }
-    
+    static TaskLookupResult getOffSwitchTaskFoundResult(Task t, 
+                                                        JobInProgress job) {
+      return new TaskLookupResult(t, job, LookUpStatus.OFF_SWITCH_TASK_FOUND);
+    }
 
     Task getTask() {
       return task;
     }
 
+    JobInProgress getJob() {
+      return job;
+    }
+    
     LookUpStatus getLookUpStatus() {
       return lookUpStatus;
     }
@@ -394,8 +405,11 @@ class CapacityTaskScheduler extends Task
     protected CapacityTaskScheduler scheduler;
     protected TaskType type = null;
 
-    abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
-        JobInProgress job) throws IOException;
+    abstract void updateTSI(QueueSchedulingInfo qsi, String user, 
+                            int numRunningTasks, int numSlotsOccupied);
+
+    abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
+        JobInProgress job, boolean assignOffSwitch) throws IOException;
 
     int getSlotsOccupied(JobInProgress job) {
       return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
@@ -557,7 +571,9 @@ class CapacityTaskScheduler extends Task
      * Always return a TaskLookupResult object. Don't return null. 
      */
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
-                                              QueueSchedulingInfo qsi)
+                                              int availableSlots,
+                                              QueueSchedulingInfo qsi,
+                                              boolean assignOffSwitch)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
@@ -583,13 +599,18 @@ class CapacityTaskScheduler extends Task
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-                                                              taskTrackerStatus)) {
+                                                              taskTrackerStatus,
+                                                              availableSlots)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTrackerStatus, j);
+          TaskLookupResult tlr = 
+            obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
           //if there is a task return it immediately.
-          if (t != null) {
+          if (tlr.getLookUpStatus() == 
+                  TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || 
+              tlr.getLookUpStatus() == 
+                  TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
             // we're successful in getting a task
-            return TaskLookupResult.getTaskFoundResult(t);
+            return tlr;
           } else {
             //skip to the next job in the queue.
             if (LOG.isDebugEnabled()) {
@@ -648,13 +669,17 @@ class CapacityTaskScheduler extends Task
         }
         
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTrackerStatus)) {
+            taskTrackerStatus, availableSlots)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTrackerStatus, j);
-          //if there is a task return it immediately.
-          if (t != null) {
+          TaskLookupResult tlr = 
+            obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
+          
+          if (tlr.getLookUpStatus() == 
+                  TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || 
+              tlr.getLookUpStatus() == 
+                  TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
             // we're successful in getting a task
-            return TaskLookupResult.getTaskFoundResult(t);
+            return tlr;
           } else {
             //skip to the next job in the queue.
             continue;
@@ -682,7 +707,9 @@ class CapacityTaskScheduler extends Task
     // Always return a TaskLookupResult object. Don't return null. 
     // The caller is responsible for ensuring that the QSI objects and the 
     // collections are up-to-date.
-    private TaskLookupResult assignTasks(TaskTracker taskTracker) 
+    private TaskLookupResult assignTasks(TaskTracker taskTracker, 
+                                         int availableSlots, 
+                                         boolean assignOffSwitch) 
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
@@ -691,7 +718,6 @@ class CapacityTaskScheduler extends Task
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
       if (job != null) {
-        int availableSlots = taskTracker.getAvailableSlots(type);
         if (LOG.isDebugEnabled()) {
           LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " + 
                     taskTracker.getTrackerName() + " with " + availableSlots + 
@@ -703,17 +729,11 @@ class CapacityTaskScheduler extends Task
           taskTracker.unreserveSlots(type, job);
           
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTrackerStatus, job);
-          //if there is a task return it immediately.
-          if (t != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.info(job.getJobID() + ": Got " + t.getTaskID() + 
-                       " for reserved tasktracker " + 
-                       taskTracker.getTrackerName());
-            }
-            // we're successful in getting a task
-            return TaskLookupResult.getTaskFoundResult(t);
-          } 
+          if (type == TaskType.MAP) {
+            // Don't care about locality!
+            job.overrideSchedulingOpportunities();
+          }
+          return obtainNewTask(taskTrackerStatus, job, true);
         } else {
           // Re-reserve the current tasktracker
           taskTracker.reserveSlots(type, job, availableSlots);
@@ -740,7 +760,8 @@ class CapacityTaskScheduler extends Task
         if(this.areTasksInQueueOverMaxCapacity(qsi,1)) {
           continue;
         }
-        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+        TaskLookupResult tlr = 
+          getTaskFromQueue(taskTracker, availableSlots, qsi, assignOffSwitch);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -748,7 +769,8 @@ class CapacityTaskScheduler extends Task
         }
 
         // if we find a task, return
-        if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
+        if (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
+            lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
           return tlr;
         }
         // if there was a memory mismatch, return
@@ -817,6 +839,22 @@ class CapacityTaskScheduler extends Task
         }
         LOG.debug(s);
       }
+      
+      StringBuffer s = new StringBuffer();
+      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+        TaskSchedulingInfo tsi = getTSI(qsi);
+        Collection<JobInProgress> runJobs =
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+        s.append(
+          String.format(
+            " Queue '%s'(%s): runningTasks=%d, "
+              + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxCapacity=%d ",
+            qsi.queueName,
+            this.type, Integer.valueOf(tsi.numRunningTasks), Integer
+              .valueOf(tsi.numSlotsOccupied), Integer
+              .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
+            Integer.valueOf(tsi.getMaxCapacity())));
+      }
     }
     
     /**
@@ -854,13 +892,47 @@ class CapacityTaskScheduler extends Task
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    void updateTSI(QueueSchedulingInfo qsi, String user, 
+                   int numRunningTasks, int numSlotsOccupied) {
+      qsi.mapTSI.numRunningTasks += numRunningTasks;
+      qsi.mapTSI.numSlotsOccupied += numSlotsOccupied;
+      Integer i = qsi.mapTSI.numSlotsOccupiedByUser.get(user);
+      int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
+      qsi.mapTSI.numSlotsOccupiedByUser.put(user, slots);
+    }
+    
+    @Override
+    TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
+                                   JobInProgress job, boolean assignOffSwitch) 
     throws IOException {
       ClusterStatus clusterStatus = 
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
-          scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+      int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();
+      
+      // Inform the job it is about to get a scheduling opportunity
+      job.schedulingOpportunity();
+      
+      // First, try to get a 'local' task
+      Task t = 
+        job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
+      
+      if (t != null) {
+        return TaskLookupResult.getTaskFoundResult(t, job); 
+      }
+      
+      // Next, try to get an 'off-switch' task if appropriate
+      // Do not bother as much about locality for High-RAM jobs
+      if (job.getNumSlotsPerMap() > 1 || 
+          (assignOffSwitch && 
+              job.scheduleOffSwitch(numTaskTrackers))) {
+        t = 
+          job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
+      }
+      
+      return (t != null) ? 
+          TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :
+          TaskLookupResult.getNoTaskFoundResult();
     }
 
     @Override
@@ -915,13 +987,27 @@ class CapacityTaskScheduler extends Task
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    void updateTSI(QueueSchedulingInfo qsi, String user, 
+                   int numRunningTasks, int numSlotsOccupied) {
+      qsi.reduceTSI.numRunningTasks += numRunningTasks;
+      qsi.reduceTSI.numSlotsOccupied += numSlotsOccupied;
+      Integer i = qsi.reduceTSI.numSlotsOccupiedByUser.get(user);
+      qsi.reduceTSI.numSlotsOccupiedByUser.put(user,
+          Integer.valueOf(i.intValue() + numSlotsOccupied));
+    }
+
+    @Override
+    TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
+                                   JobInProgress job, boolean unused) 
     throws IOException {
       ClusterStatus clusterStatus = 
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+      Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+      
+      return (t != null) ? TaskLookupResult.getTaskFoundResult(t, job) :
+        TaskLookupResult.getNoTaskFoundResult();
     }
 
     @Override
@@ -998,7 +1084,6 @@ class CapacityTaskScheduler extends Task
   private long memSizeForReduceSlotOnJT;
   private long limitMaxMemForMapTasks;
   private long limitMaxMemForReduceTasks;
-  private boolean assignMultipleTasks = true;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -1244,6 +1329,8 @@ class CapacityTaskScheduler extends Task
     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
     mapScheduler.updateCollectionOfQSIs();
     reduceScheduler.updateCollectionOfQSIs();
+    mapScheduler.printQSIs();
+    reduceScheduler.printQSIs();
   }
 
   /**
@@ -1307,23 +1394,20 @@ class CapacityTaskScheduler extends Task
         int numReservedReduceSlotsForThisJob = 
           (reduceScheduler.getNumReservedTaskTrackers(j) * 
            reduceScheduler.getSlotsPerTask(j)); 
+        
         j.setSchedulingInfo(getJobQueueSchedInfo(numMapsRunningForThisJob, 
                               numRunningMapSlots,
                               numReservedMapSlotsForThisJob,
                               numReducesRunningForThisJob, 
                               numRunningReduceSlots,
                               numReservedReduceSlotsForThisJob));
-        qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
-        qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
-        qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
-        qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
-        Integer i =
-            qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
-        i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
+        
+        mapScheduler.updateTSI(qsi, j.getProfile().getUser(), 
+                               numMapsRunningForThisJob, numMapSlotsForThisJob);
+        reduceScheduler.updateTSI(qsi, j.getProfile().getUser(), 
+                                  numReducesRunningForThisJob, 
+                                  numReduceSlotsForThisJob);
+
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
               + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
@@ -1372,18 +1456,6 @@ class CapacityTaskScheduler extends Task
     return sb.toString();
   }
 
-  /**
-   * Sets whether the scheduler can assign multiple tasks in a heartbeat
-   * or not.
-   * 
-   * This method is used only for testing purposes.
-   * 
-   * @param assignMultipleTasks true, to assign multiple tasks per heartbeat
-   */
-  void setAssignMultipleTasks(boolean assignMultipleTasks) {
-    this.assignMultipleTasks = assignMultipleTasks;
-  }
-
   /*
    * The grand plan for assigning a task.
    * 
@@ -1427,33 +1499,12 @@ class CapacityTaskScheduler extends Task
      * becomes expensive, do it once every few heartbeats only.
      */ 
     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    
+    // schedule tasks
     List<Task> result = new ArrayList<Task>();
-    if (assignMultipleTasks) {
-      addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
-      addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
-    } else {
-      /* 
-       * If TT has Map and Reduce slot free, we need to figure out whether to
-       * give it a Map or Reduce task.
-       * Number of ways to do this. For now, base decision on how much is needed
-       * versus how much is used (default to Map, if equal).
-       */
-      if ((maxReduceSlots - currentReduceSlots) 
-          > (maxMapSlots - currentMapSlots)) {
-        addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
-        if (result.size() == 0) {
-          addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
-        }
-      } else {
-        addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
-        if (result.size() == 0) {
-          addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
-        }
-      }
-      if (result.size() == 0) {
-        return null;
-      }
-    }
+    addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
+    int numMapsAssigned = result.size(); 
+    addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
     return result;
   }
 
@@ -1462,10 +1513,12 @@ class CapacityTaskScheduler extends Task
   private void addReduceTask(TaskTracker taskTracker, List<Task> tasks,
                                   int maxReduceSlots, int currentReduceSlots) 
                     throws IOException {
-    if (maxReduceSlots > currentReduceSlots) {
+    int availableSlots = maxReduceSlots - currentReduceSlots;
+    if (availableSlots > 0) {
       reduceScheduler.updateCollectionOfQSIs();
-      TaskLookupResult tlr = reduceScheduler.assignTasks(taskTracker);
-      if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+      TaskLookupResult tlr = 
+        reduceScheduler.assignTasks(taskTracker, availableSlots, true);
+      if (TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND == tlr.getLookUpStatus()) {
         tasks.add(tlr.getTask());
       }
     }
@@ -1473,15 +1526,37 @@ class CapacityTaskScheduler extends Task
   
   // Pick a map task and add to the list of tasks, if there's space
   // on the TT to run one.
-  private void addMapTask(TaskTracker taskTracker, List<Task> tasks, 
+  private void addMapTasks(TaskTracker taskTracker, List<Task> tasks, 
                               int maxMapSlots, int currentMapSlots)
                     throws IOException {
-    if (maxMapSlots > currentMapSlots) {
+    int availableSlots = maxMapSlots - currentMapSlots;
+    boolean assignOffSwitch = true;
+    while (availableSlots > 0) {
       mapScheduler.updateCollectionOfQSIs();
-      TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker);
-      if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
-        tasks.add(tlr.getTask());
-      }
+      TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker, 
+                                                      availableSlots,
+                                                      assignOffSwitch);
+      if (TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
+            tlr.getLookUpStatus() || 
+          TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
+            tlr.getLookUpStatus()) {
+        break;
+      }
+
+      Task t = tlr.getTask();
+      JobInProgress job = tlr.getJob();
+
+      tasks.add(t);
+
+      if (TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND == 
+        tlr.getLookUpStatus()) {
+        // Atmost 1 off-switch task per-heartbeat
+        assignOffSwitch = false;
+      }
+      availableSlots -= t.getNumSlotsRequired();
+      mapScheduler.updateTSI(queueInfoMap.get(job.getProfile().getQueueName()), 
+                             job.getProfile().getUser(), 1, 
+                             t.getNumSlotsRequired());
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri Mar  4 04:26:09 2011
@@ -45,17 +45,21 @@ class MemoryMatcher {
     return true;
   }
 
+  
   /**
    * Find the memory that is already used by all the running tasks
    * residing on the given TaskTracker.
    * 
    * @param taskTracker
    * @param taskType 
+   * @param availableSlots
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  synchronized Long getMemReservedForTasks(
-      TaskTrackerStatus taskTracker, TaskType taskType) {
+  synchronized long getMemReservedForTasks(
+      TaskTrackerStatus taskTracker, TaskType taskType, int availableSlots) {
+    int currentlyScheduled = 
+      currentlyScheduled(taskTracker, taskType, availableSlots);
     long vmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
@@ -80,18 +84,38 @@ class MemoryMatcher {
       }
     }
 
-    return Long.valueOf(vmem);
+    long currentlyScheduledVMem = 
+      currentlyScheduled * ((taskType == TaskType.MAP) ? 
+          scheduler.getMemSizeForMapSlot() : 
+            scheduler.getMemSizeForReduceSlot());
+    return vmem + currentlyScheduledVMem; 
   }
 
+  private int currentlyScheduled(TaskTrackerStatus taskTracker, 
+                                 TaskType taskType, int availableSlots) {
+    int scheduled = 0;
+    if (taskType == TaskType.MAP) {
+      scheduled = 
+        (taskTracker.getMaxMapSlots() - taskTracker.countOccupiedMapSlots()) - 
+            availableSlots;
+    } else {
+      scheduled = 
+        (taskTracker.getMaxReduceSlots() - 
+            taskTracker.countOccupiedReduceSlots()) - availableSlots;
+    }
+    return scheduled;
+  }
   /**
    * Check if a TT has enough memory to run of task specified from this job.
    * @param job
    * @param taskType 
    * @param taskTracker
+   * @param availableSlots
    * @return true if this TT has enough memory for this job. False otherwise.
    */
   boolean matchesMemoryRequirements(JobInProgress job,TaskType taskType, 
-                                    TaskTrackerStatus taskTracker) {
+                                    TaskTrackerStatus taskTracker, 
+                                    int availableSlots) {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Matching memory requirements of " + job.getJobID().toString()
@@ -106,7 +130,8 @@ class MemoryMatcher {
       return true;
     }
 
-    Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+    long memUsedOnTT = 
+      getMemReservedForTasks(taskTracker, taskType, availableSlots);
     long totalMemUsableOnTT = 0;
     long memForThisTask = 0;
     if (taskType == TaskType.MAP) {
@@ -120,7 +145,7 @@ class MemoryMatcher {
               * taskTracker.getMaxReduceSlots();
     }
 
-    long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+    long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT;
     if (memForThisTask > freeMemOnTT) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("



Mime
View raw message