hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077623 - /hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
Date Fri, 04 Mar 2011 04:37:33 GMT
Author: omalley
Date: Fri Mar  4 04:37:33 2011
New Revision: 1077623

URL: http://svn.apache.org/viewvc?rev=1077623&view=rev
Log:
commit c67ad53ff8e2a6aba5ea86a05aae66eaef28b265
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Thu Jul 29 23:30:43 2010 -0700

    MAPREDUCE-1872. Fixed a corner case where a queue's waiting jobs count wasn't updated
when an un-initialized job was killed. Also fixed task limits during initialization to do
a 'greater than limit' check rather than a 'greater than or equals to limit' check.

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077623&r1=1077622&r2=1077623&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
Fri Mar  4 04:37:33 2011
@@ -698,9 +698,13 @@ class CapacitySchedulerQueue {
       ++waitingJobs;
     }
     
-    public void jobInitialized(JobInProgress job) {
+    public void removeWaitingJob(JobInProgress job) {
       --waitingJobs;
-      
+    }
+    
+    public void jobInitialized(JobInProgress job) {
+      removeWaitingJob(job);
+
       ++runningJobs;
       activeTasks += job.desiredTasks();
     }
@@ -820,7 +824,13 @@ class CapacitySchedulerQueue {
   }
   
   synchronized JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
-    return waitingJobs.remove(schedInfo);
+    JobInProgress job = waitingJobs.remove(schedInfo);
+    if (job != null) {
+      String user = job.getProfile().getUser();
+      UserInfo userInfo = users.get(user);
+      userInfo.removeWaitingJob(job);
+    }
+    return job;
   }
 
   synchronized int getNumActiveUsers() {
@@ -1028,7 +1038,7 @@ class CapacitySchedulerQueue {
     }
     
     // Check if queue has too many active tasks
-    if ((activeTasks + currentlyInitializedTasks + job.desiredTasks()) >= 
+    if ((activeTasks + currentlyInitializedTasks + job.desiredTasks()) > 
          maxActiveTasks) {
       LOG.info("Queue '" + getQueueName() + "' has " + activeTasks + 
           " active tasks and " + currentlyInitializedTasks + " tasks about to" +
@@ -1073,7 +1083,7 @@ class CapacitySchedulerQueue {
     
     // Check if the user has too many active tasks
     int userActiveTasks = getNumActiveTasksByUser(user);
-    if ((userActiveTasks + currentlyInitializedTasks) >= 
+    if ((userActiveTasks + currentlyInitializedTasks + job.desiredTasks()) > 
         maxActiveTasksPerUser) {
       LOG.info(getQueueName() + " has " + userActiveTasks + 
           " active tasks and " + currentlyInitializedTasks + " tasks about to" +



Mime
View raw message