hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1130838 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...
Date Thu, 02 Jun 2011 21:56:12 GMT
Author: acmurthy
Date: Thu Jun  2 21:56:12 2011
New Revision: 1130838

URL: http://svn.apache.org/viewvc?rev=1130838&view=rev
Log:
Fix a corner case in headroom computation - now reservations are taken into account and headroom
is computed much later to account for allocations/reservations.  

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1130838&r1=1130837&r2=1130838&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu Jun  2 21:56:12 2011
@@ -5,6 +5,10 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fix a corner case in headroom computation - now reservations are taken
+    into account and headroom is computed much later to account for
+    allocations/reservations. (acmurthy) 
+
     Fix to report job status if the application is KILLED/FAILED. (mahadev)
 
     Disable Job acls until fixed (mahadev)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1130838&r1=1130837&r2=1130838&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
Thu Jun  2 21:56:12 2011
@@ -32,8 +32,8 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.res
  * This class keeps track of all the consumption of an application. This also
  * keeps track of current running/completed containers for the application.
  */
-@LimitedPrivate("yarn")
-@Evolving
+@Private
+@Unstable
 public class Application {
   
   private static final Log LOG = LogFactory.getLog(Application.class);
@@ -66,9 +66,12 @@ public class Application {
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests = new HashMap<Priority,
Map<String, ResourceRequest>>();
+  final Map<Priority, Map<String, ResourceRequest>> requests = 
+    new HashMap<Priority, Map<String, ResourceRequest>>();
   final Resource currentConsumption = recordFactory
       .newRecordInstance(Resource.class);
+  final Resource currentReservation = recordFactory
+  .newRecordInstance(Resource.class);
   final Resource overallConsumption = recordFactory
       .newRecordInstance(Resource.class);
   Resource resourceLimit = recordFactory.newRecordInstance(Resource.class);
@@ -266,6 +269,11 @@ public class Application {
     return (nodeRequests == null) ? null : nodeRequests.get(nodeAddress);
   }
 
+  public synchronized Resource getResource(Priority priority) {
+    ResourceRequest request = getResourceRequest(priority, NodeManager.ANY);
+    return request.getCapability();
+  }
+
   synchronized public void completedContainer(Container container, 
       Resource containerResource) {
     if (container != null) {
@@ -443,15 +451,17 @@ public class Application {
   }
 
   synchronized public void showRequests() {
-    for (Priority priority : getPriorities()) {
-      Map<String, ResourceRequest> requests = getResourceRequests(priority);
-      if (requests != null) {
-        LOG.debug("showRequests:" + " application=" + applicationId + 
-            " available=" + getResourceLimit() + 
-            " current=" + currentConsumption + " state=" + getState());
-        for (ResourceRequest request : requests.values()) {
-          LOG.debug("showRequests:" + " application=" + applicationId
-              + " request=" + request);
+    if (LOG.isDebugEnabled()) {
+      for (Priority priority : getPriorities()) {
+        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+        if (requests != null) {
+          LOG.debug("showRequests:" + " application=" + applicationId + 
+              " available=" + getHeadroom() + 
+              " current=" + currentConsumption + " state=" + getState());
+          for (ResourceRequest request : requests.values()) {
+            LOG.debug("showRequests:" + " application=" + applicationId
+                + " request=" + request);
+          }
         }
       }
     }
@@ -492,9 +502,11 @@ public class Application {
       reservedContainers.put(priority, reservedNodes);
     }
     reservedNodes.add(node);
+    Resources.add(currentReservation, resource);
     LOG.info("Application " + applicationId + " reserved " + resource
         + " on node " + node + ", currently has " + reservedNodes.size()
-        + " at priority " + priority);
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation);
     queue.getMetrics().reserveResource(user, resource);
   }
 
@@ -504,10 +516,13 @@ public class Application {
     if (reservedNodes.isEmpty()) {
       this.reservedContainers.remove(priority);
     }
+    
+    Resource resource = getResource(priority);
+    Resources.subtract(currentReservation, resource);
 
     LOG.info("Application " + applicationId + " unreserved " + " on node "
         + node + ", currently has " + reservedNodes.size() + " at priority "
-        + priority);
+        + priority + "; currentReservation " + currentReservation);
     queue.getMetrics().unreserveResource(user, node.getReservedResource());
   }
 
@@ -547,19 +562,23 @@ public class Application {
   }
 
   public synchronized void setAvailableResourceLimit(Resource globalLimit) {
-    resourceLimit = Resources.subtract(globalLimit, currentConsumption);
-    
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemory() < 0) {
-      resourceLimit.setMemory(0);
-    }
+    this.resourceLimit = globalLimit; 
   }
 
   /**
    * Get available headroom in terms of resources for the application's user.
    * @return available resource headroom
    */
-  public synchronized Resource getResourceLimit() {
-    return resourceLimit;
+  public synchronized Resource getHeadroom() {
+    Resource limit = 
+      Resources.subtract(Resources.subtract(resourceLimit, currentConsumption), 
+          currentReservation);
+
+    // Corner case to deal with applications being slightly over-limit
+    if (limit.getMemory() < 0) {
+      limit.setMemory(0);
+    }
+    
+    return limit;
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1130838&r1=1130837&r2=1130838&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Thu Jun  2 21:56:12 2011
@@ -365,7 +365,7 @@ implements ResourceScheduler, CapacitySc
     List<Container> allocatedContainers = application.acquire();
 
     // Resource limit
-    Resource limit = application.getResourceLimit();
+    Resource limit = application.getHeadroom();
     
     LOG.info("DEBUG --- allocate:" +
         " applicationId=" + applicationId + 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1130838&r1=1130837&r2=1130838&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Thu Jun  2 21:56:12 2011
@@ -537,7 +537,7 @@ public class LeafQueue implements Queue 
             computeUserLimit(application, clusterResource, 
                 required.getCapability());
           if (!assignToUser(application.getUser(), userLimit)) {
-            continue; 
+            break; 
           }
 
           // Inform the application it is about to get a scheduling opportunity
@@ -644,8 +644,8 @@ public class LeafQueue implements Queue 
           divideAndCeil(
               (int)(absoluteCapacity * clusterResource.getMemory()), 
               minimumAllocation.getMemory()) 
-              * minimumAllocation.getMemory(),           // round up 
-              required.getMemory());
+                * minimumAllocation.getMemory(),           // round up 
+          required.getMemory());
 
     final int consumed = usedResources.getMemory();
     final int currentCapacity = 
@@ -669,10 +669,11 @@ public class LeafQueue implements Queue 
       );
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("User limit computatation for " + userName + 
+      LOG.debug("User limit computation for " + userName + 
           " in queue " + getQueueName() + 
           " required: " + required + 
-          " consumed: " + user.getConsumedResources() + " limit: " + limit +
+          " consumed: " + user.getConsumedResources() + 
+          " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + consumed +
           " currentCapacity: " + currentCapacity +

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1130838&r1=1130837&r2=1130838&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Thu Jun  2 21:56:12 2011
@@ -202,7 +202,7 @@ public class FifoScheduler implements Re
       application.showRequests();
 
       allocatedContainers = application.acquire();
-      limit = application.getResourceLimit();
+      limit = application.getHeadroom();
       LOG.debug("allocate:" +
           " applicationId=" + applicationId + 
           " #ask=" + ask.size() + 



Mime
View raw message