hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153453 [2/2] - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager: rmcontainer/ scheduler/ scheduler/capacity/ scheduler/fifo/
Date Wed, 03 Aug 2011 11:51:22 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug  3 11:51:20 2011
@@ -36,9 +36,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,20 +46,18 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
@@ -85,8 +82,10 @@ public class LeafQueue implements Queue 
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
-  Set<CSApp> applications;
-
+  Set<SchedulerApp> applications;
+  Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
+      new HashMap<ApplicationAttemptId, SchedulerApp>();
+  
   public final Resource minimumAllocation;
 
   private ContainerTokenSecretManager containerTokenSecretManager;
@@ -109,7 +108,7 @@ public class LeafQueue implements Queue 
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
-      Comparator<CSApp> applicationComparator, Queue old) {
+      Comparator<SchedulerApp> applicationComparator, Queue old) {
     this.scheduler = cs;
     this.queueName = queueName;
     this.parent = parent;
@@ -158,7 +157,7 @@ public class LeafQueue implements Queue 
         " name=" + queueName + 
         ", fullname=" + getQueuePath());
 
-    this.applications = new TreeSet<CSApp>(applicationComparator);
+    this.applications = new TreeSet<SchedulerApp>(applicationComparator);
   }
 
   private synchronized void setupQueueConfigs(
@@ -362,7 +361,7 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public void submitApplication(CSApp application, String userName,
+  public void submitApplication(SchedulerApp application, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -423,10 +422,11 @@ public class LeafQueue implements Queue 
     }
   }
 
-  private synchronized void addApplication(CSApp application, User user) {
+  private synchronized void addApplication(SchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     applications.add(application);
+    applicationsMap.put(application.getApplicationAttemptId(), application);
 
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() +
@@ -436,7 +436,7 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public void finishApplication(CSApp application, String queue) {
+  public void finishApplication(SchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
       removeApplication(application, getUser(application.getUser()));
@@ -446,8 +446,9 @@ public class LeafQueue implements Queue 
     parent.finishApplication(application, queue);
   }
 
-  public synchronized void removeApplication(CSApp application, User user) {
+  public synchronized void removeApplication(SchedulerApp application, User user) {
     applications.remove(application);
+    applicationsMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication();
     if (user.getApplications() == 0) {
@@ -461,24 +462,31 @@ public class LeafQueue implements Queue 
         " #user-applications: " + user.getApplications() + 
         " #queue-applications: " + getNumApplications());
   }
+  
+  private synchronized SchedulerApp getApplication(
+      ApplicationAttemptId applicationAttemptId) {
+    return applicationsMap.get(applicationAttemptId);
+  }
 
   @Override
   public synchronized Resource 
-  assignContainers(Resource clusterResource, CSNode node) {
+  assignContainers(Resource clusterResource, SchedulerNode node) {
 
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
         " #applications=" + applications.size());
     
     // Check for reserved resources
-    CSApp reservedApplication = node.getReservedApplication();
-    if (reservedApplication != null) {
-      return assignReservedContainers(reservedApplication, node, 
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      SchedulerApp application = 
+          getApplication(reservedContainer.getApplicationAttemptId());
+      return assignReservedContainer(application, node, reservedContainer, 
           clusterResource);
     }
-
-    // Try to assign containers to applications in fifo order
-    for (CSApp application : applications) {
+    
+    // Try to assign containers to applications in order
+    for (SchedulerApp application : applications) {
       
       LOG.info("DEBUG --- pre-assignContainers for application "
           + application.getApplicationId());
@@ -497,6 +505,7 @@ public class LeafQueue implements Queue 
           }
 
           // Are we going over limits by allocating to this application?
+          
           ResourceRequest required = 
             application.getResourceRequest(priority, RMNode.ANY);
           
@@ -520,7 +529,7 @@ public class LeafQueue implements Queue 
           // Try to schedule
           Resource assigned = 
             assignContainersOnNode(clusterResource, node, application, priority, 
-                false);
+                null);
   
           // Did we schedule or reserve a container?
           if (Resources.greaterThan(assigned, Resources.none())) {
@@ -552,30 +561,21 @@ public class LeafQueue implements Queue 
 
   }
 
-  private synchronized Resource assignReservedContainers(CSApp application, 
-      CSNode node, Resource clusterResource) {
-    synchronized (application) {
-      for (Priority priority : application.getPriorities()) {
-
-        // Do we reserve containers at this 'priority'?
-        if (application.isReserved(node, priority)) {
-          
-          // Do we really need this reservation still?
-          ResourceRequest offSwitchRequest = 
-            application.getResourceRequest(priority, RMNode.ANY);
-          if (offSwitchRequest.getNumContainers() == 0) {
-            // Release
-            unreserve(application, priority, node);
-            return offSwitchRequest.getCapability();
-          }
-
-          // Try to assign if we have sufficient resources
-          assignContainersOnNode(clusterResource, node, application, priority, 
-              true);
-        }
-      }
+  private synchronized Resource assignReservedContainer(SchedulerApp application, 
+      SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+    // Do we still need this reservation?
+    Priority priority = rmContainer.getReservedPriority();
+    if (application.getTotalRequiredResources(priority) == 0) {
+      // Release
+      Container container = rmContainer.getContainer();
+      completedContainer(clusterResource, application, node, 
+          rmContainer, RMContainerEventType.RELEASED);
+      return container.getResource();
     }
 
+    // Try to assign if we have sufficient resources
+    assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
+    
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
@@ -599,12 +599,12 @@ public class LeafQueue implements Queue 
     return true;
   }
 
-  private void setUserResourceLimit(CSApp application, Resource resourceLimit) {
+  private void setUserResourceLimit(SchedulerApp application, Resource resourceLimit) {
     application.setAvailableResourceLimit(resourceLimit);
     metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
   }
   
-  private Resource computeUserLimit(CSApp application, 
+  private Resource computeUserLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -688,80 +688,87 @@ public class LeafQueue implements Queue 
     return (a + (b - 1)) / b;
   }
 
-  boolean needContainers(CSApp application, Priority priority) {
-    ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, RMNode.ANY);
-
-    int requiredContainers = offSwitchRequest.getNumContainers();
-    int reservedContainers = application.getReservedContainers(priority);
+  boolean needContainers(SchedulerApp application, Priority priority) {
+    int requiredContainers = application.getTotalRequiredResources(priority);
+    int reservedContainers = application.getNumReservedContainers(priority);
     return ((requiredContainers - reservedContainers) > 0);
   }
 
-  Resource assignContainersOnNode(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority, boolean reserved) {
+  Resource assignContainersOnNode(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, RMContainer reservedContainer) {
 
     Resource assigned = Resources.none();
 
     // Data-local
-    assigned = assignNodeLocalContainers(clusterResource, node, application, priority); 
+    assigned = 
+        assignNodeLocalContainers(clusterResource, node, application, priority,
+            reservedContainer); 
     if (Resources.greaterThan(assigned, Resources.none())) {
       return assigned;
     }
 
     // Rack-local
-    assigned = assignRackLocalContainers(clusterResource, node, application, priority);
+    assigned = 
+        assignRackLocalContainers(clusterResource, node, application, priority, 
+            reservedContainer);
     if (Resources.greaterThan(assigned, Resources.none())) {
-    return assigned;
+      return assigned;
     }
     
     // Off-switch
     return assignOffSwitchContainers(clusterResource, node, application, 
-        priority, reserved);
+        priority, reservedContainer);
   }
 
-  Resource assignNodeLocalContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority) {
+  Resource assignNodeLocalContainers(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
     ResourceRequest request = application.getResourceRequest(priority, node
         .getNodeAddress());
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) {
+      if (canAssign(application, priority, node, NodeType.DATA_LOCAL, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.DATA_LOCAL);
+            NodeType.DATA_LOCAL, reservedContainer);
       }
     }
     
     return Resources.none();
   }
 
-  Resource assignRackLocalContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority) {
+  Resource assignRackLocalContainers(Resource clusterResource,  
+      SchedulerNode node, SchedulerApp application, Priority priority,
+      RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRackName());
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.RACK_LOCAL, false)) {
+      if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.RACK_LOCAL);
+            NodeType.RACK_LOCAL, reservedContainer);
       }
     }
     return Resources.none();
   }
 
-  Resource assignOffSwitchContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority, boolean reserved) {
+  Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, RMNode.ANY);
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) {
+      if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.OFF_SWITCH);
+            NodeType.OFF_SWITCH, reservedContainer);
       }
     }
     
     return Resources.none();
   }
 
-  boolean canAssign(CSApp application, Priority priority, 
-      CSNode node, NodeType type, boolean reserved) {
+  boolean canAssign(SchedulerApp application, Priority priority, 
+      SchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
     ResourceRequest offSwitchRequest = 
       application.getResourceRequest(priority, RMNode.ANY);
@@ -781,18 +788,18 @@ public class LeafQueue implements Queue 
       
       if (requiredContainers > 0) {
         // No 'delay' for reserved containers
-        if (reserved) {
+        if (reservedContainer != null) {
           return true;
         }
         
-//        // Check if we have waited long enough
-//        if (missedNodes < (requiredContainers * localityWaitFactor)) {
-//          LOG.info("Application " + application.getApplicationId() + 
-//              " has missed " + missedNodes + " opportunities," +
-//              " waitFactor= " + localityWaitFactor + 
-//              " for cluster of size " + scheduler.getNumClusterNodes());
-//          return false;
-//        }
+        // Check if we have waited long enough
+        if (missedNodes < (requiredContainers * localityWaitFactor)) {
+          LOG.info("Application " + application.getApplicationId() + 
+              " has missed " + missedNodes + " opportunities," +
+              " waitFactor= " + localityWaitFactor + 
+              " for cluster of size " + scheduler.getNumClusterNodes());
+          return false;
+        }
         return true;
       }
       return false;
@@ -830,157 +837,162 @@ public class LeafQueue implements Queue 
 
     return false;
   }
-  private Resource assignContainer(Resource clusterResource, CSNode node, 
-      CSApp application, 
-      Priority priority, ResourceRequest request, NodeType type) {
+  
+  private Container getContainer(RMContainer rmContainer, 
+      SchedulerApp application, SchedulerNode node, Resource capability) {
+    if (rmContainer != null) {
+      return rmContainer.getContainer();
+    }
+
+    Container container = 
+          BuilderUtils.newContainer(this.recordFactory,
+              application.getApplicationAttemptId(),
+              application.getNewContainerId(),
+              node.getNodeID(),
+              node.getHttpAddress(), capability);
+
+    // If security is enabled, send the container-tokens too.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      ContainerToken containerToken = 
+          this.recordFactory.newRecordInstance(ContainerToken.class);
+      ContainerTokenIdentifier tokenidentifier =
+          new ContainerTokenIdentifier(container.getId(),
+              container.getNodeId().toString(), container.getResource());
+      containerToken.setIdentifier(
+          ByteBuffer.wrap(tokenidentifier.getBytes()));
+      containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+      containerToken.setPassword(
+          ByteBuffer.wrap(
+              containerTokenSecretManager.createPassword(tokenidentifier))
+          );
+      containerToken.setService(container.getNodeId().toString());
+      container.setContainerToken(containerToken);
+    }
+
+    return container;
+  }
+  
+  private Resource assignContainer(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      ResourceRequest request, NodeType type, RMContainer rmContainer) {
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
         " application=" + application.getApplicationId().getId() + 
         " priority=" + priority.getPriority() + 
         " request=" + request + " type=" + type);
     Resource capability = request.getCapability();
-    
-    Resource available = node.getAvailableResource();
 
-    if (available.getMemory() >  0) {
-      
-      int availableContainers = 
-        available.getMemory() / capability.getMemory();         // TODO: A buggy
-                                                                // application
-                                                                // with this
-                                                                // zero would
-                                                                // crash the
-                                                                // scheduler.
-    
-    if (availableContainers > 0) {
-      List<Container> containers =
-        new ArrayList<Container>();
-      Container container =
-         BuilderUtils.newContainer(this.recordFactory,
-                    application.getApplicationAttemptId(),
-                    application.getNewContainerId(),
-                    node.getNodeID(),
-                    node.getHttpAddress(), capability);
-      
-      // If security is enabled, send the container-tokens too.
-      if (UserGroupInformation.isSecurityEnabled()) {
-        ContainerToken containerToken = this.recordFactory.newRecordInstance(ContainerToken.class);
-        ContainerTokenIdentifier tokenidentifier =
-          new ContainerTokenIdentifier(container.getId(),
-              container.getNodeId().toString(), container.getResource());
-        containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
-        containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
-        containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
-              .createPassword(tokenidentifier)));
-          containerToken.setService(container.getNodeId().toString());
-          container.setContainerToken(containerToken);
-        }
-
-        containers.add(container);
-
-        // Allocate
-        allocate(application, type, priority, request, node, containers);
+    Resource available = node.getAvailableResource();
 
-        // Did we previously reserve containers at this 'priority'?
-        if (application.isReserved(node, priority)){
-          unreserve(application, priority, node);
-        }
-        
-        LOG.info("assignedContainer" +
-            " application=" + application.getApplicationId() +
-            " container=" + container + 
-            " queue=" + this.toString() + 
-            " util=" + getUtilization() + 
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
+    assert (available.getMemory() >  0);
 
-        return container.getResource();
-      } else {
-        // Reserve by 'charging' in advance...
-        reserve(application, priority, node, request.getCapability());
-        
-        LOG.info("Reserved container " + 
-            " application=" + application.getApplicationId() +
-            " resource=" + request.getCapability() + 
-            " queue=" + this.toString() + 
-            " util=" + getUtilization() + 
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
+    // Create the container if necessary
+    Container container = 
+        getContainer(rmContainer, application, node, capability);
+
+    // Can we allocate a container on this node?
+    int availableContainers = 
+        available.getMemory() / capability.getMemory();         
+    if (availableContainers > 0) {
+      // Allocate...
 
-        return request.getCapability();
+      // Did we previously reserve containers at this 'priority'?
+      if (rmContainer != null){
+        unreserve(application, priority, node, rmContainer);
+      }
 
+      // Inform the application
+      RMContainer allocatedContainer = 
+          application.allocate(type, node, priority, request, container);
+      if (allocatedContainer == null) {
+        // Did the application need this resource?
+        return Resources.none();
       }
-    }
 
-    return Resources.none();
+      // Inform the node
+      node.allocateContainer(application.getApplicationId(), 
+          allocatedContainer);
+
+      LOG.info("assignedContainer" +
+          " application=" + application.getApplicationId() +
+          " container=" + container + 
+          " containerId=" + container.getId() + 
+          " queue=" + this + 
+          " util=" + getUtilization() + 
+          " used=" + usedResources + 
+          " cluster=" + clusterResource);
+
+      return container.getResource();
+    } else {
+      // Reserve by 'charging' in advance...
+      reserve(application, priority, node, rmContainer, container);
+
+      LOG.info("Reserved container " + 
+          " application=" + application.getApplicationId() +
+          " resource=" + request.getCapability() + 
+          " queue=" + this.toString() + 
+          " util=" + getUtilization() + 
+          " used=" + usedResources + 
+          " cluster=" + clusterResource);
+
+      return request.getCapability();
+    }
   }
 
-  private void allocate(CSApp application, NodeType type, 
-      Priority priority, ResourceRequest request, 
-      CSNode node, List<Container> containers) {
-    // Allocate container to the application
-    // TODO: acm: refactor2 FIXME
-    application.allocate(type, node, priority, request, null);
-
-    for (Container container : containers) {
-      // Create the container and 'start' it.
-      ContainerId containerId = container.getId();
-      RMContext rmContext = this.scheduler.getRMContext();
-      EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
-      RMContainer rmContainer = new RMContainerImpl(container, application
-          .getApplicationAttemptId(), node.getNodeID(),
-          eventHandler, rmContext.getContainerAllocationExpirer());
-      // TODO: FIX
-//      if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
-//        LOG.error("Duplicate container addition! ContainerID :  "
-//            + containerId);
-//      } else {
-//        eventHandler.handle(new RMContainerEvent(containerId,
-//            RMContainerEventType.START));
-//      }
-    }
-
-    // Inform the NodeManager about the allocation
-    // TODO: acm: refactor2 FIXME
-//    node.allocateContainer(application.getApplicationId(),
-//        containers);
-  }
-
-  private void reserve(CSApp application, Priority priority, 
-      CSNode node, Resource resource) {
-    application.reserveResource(node, priority, resource);
-    node.reserveResource(application, priority, resource);
+  private void reserve(SchedulerApp application, Priority priority, 
+      SchedulerNode node, RMContainer rmContainer, Container container) {
+    rmContainer = application.reserve(node, priority, rmContainer, container);
+    node.reserveResource(application, priority, rmContainer);
+    
+    // Update reserved metrics if this is the first reservation
+    if (rmContainer == null) {
+      getMetrics().reserveResource(
+          application.getUser(), container.getResource());
+    }
   }
 
-  private void unreserve(CSApp application, Priority priority, 
-      CSNode node) {
+  private void unreserve(SchedulerApp application, Priority priority, 
+      SchedulerNode node, RMContainer rmContainer) {
     // Done with the reservation?
-    if (application.isReserved(node, priority)) {
-      application.unreserveResource(node, priority);
-      node.unreserveResource(application, priority);
-    }
+    application.unreserve(node, priority);
+    node.unreserveResource(application);
+      
+      // Update reserved metrics
+    getMetrics().unreserveResource(
+        application.getUser(), rmContainer.getContainer().getResource());
   }
 
 
   @Override
   public void completedContainer(Resource clusterResource, 
-      Container container, Resource containerResource, CSApp application) {
+      SchedulerApp application, SchedulerNode node, RMContainer rmContainer, 
+      RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
       synchronized (this) {
+
+        Container container = rmContainer.getContainer();
         
-        // Inform the application - this might be an allocated container or
-        // an unfulfilled reservation
-        // TODO: acm: refactor2 FIXME
-        //application.completedContainer(container, containerResource);
-        
+        // Inform the application & the node
+        // Note: It's safe to assume that all state changes to RMContainer
+        // happen under scheduler's lock... 
+        // So, this is, in effect, a transaction across application & node
+        if (rmContainer.getState() == RMContainerState.RESERVED) {
+          application.unreserve(node, rmContainer.getReservedPriority());
+          node.unreserveResource(application);
+        } else {
+          application.containerCompleted(rmContainer, event);
+          node.releaseContainer(container);
+        }
+
+
         // Book-keeping
         releaseResource(clusterResource, 
-            application.getUser(), containerResource);
+            application.getUser(), container.getResource());
 
         LOG.info("completedContainer" +
             " container=" + container +
-            " resource=" + containerResource +
+            " resource=" + container.getResource() +
         		" queue=" + this + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
@@ -988,29 +1000,41 @@ public class LeafQueue implements Queue 
       }
 
       // Inform the parent queue
-      parent.completedContainer(clusterResource, container, 
-          containerResource, application);
+      parent.completedContainer(clusterResource, application, 
+          node, rmContainer, event);
     }
   }
 
   private synchronized void allocateResource(Resource clusterResource, 
       String userName, Resource resource) {
+    // Update queue metrics
     Resources.addTo(usedResources, resource);
     updateResource(clusterResource);
     ++numContainers;
 
+    // Update user metrics
     User user = getUser(userName);
     user.assignContainer(resource);
+    
+    LOG.info(getQueueName() + 
+        " used=" + usedResources + " numContainers=" + numContainers + 
+        " user=" + userName + " resources=" + user.getConsumedResources());
   }
 
   private synchronized void releaseResource(Resource clusterResource, 
       String userName, Resource resource) {
+    // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     updateResource(clusterResource);
     --numContainers;
 
+    // Update user metrics
     User user = getUser(userName);
     user.releaseContainer(resource);
+    
+    LOG.info(getQueueName() + 
+        " used=" + usedResources + " numContainers=" + numContainers + 
+        " user=" + userName + " resources=" + user.getConsumedResources());
   }
 
   @Override
@@ -1062,7 +1086,7 @@ public class LeafQueue implements Queue 
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      CSApp application, Container container) {
+      SchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, application.getUser(), container.getResource());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug  3 11:51:20 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -46,7 +45,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 @Private
 @Evolving
@@ -396,7 +399,7 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public void submitApplication(CSApp application, String user,
+  public void submitApplication(SchedulerApp application, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -428,7 +431,7 @@ public class ParentQueue implements Queu
     }
   }
 
-  private synchronized void addApplication(CSApp application, 
+  private synchronized void addApplication(SchedulerApp application, 
       String user) {
   
     ++numApplications;
@@ -441,7 +444,7 @@ public class ParentQueue implements Queu
   }
   
   @Override
-  public void finishApplication(CSApp application, String queue) {
+  public void finishApplication(SchedulerApp application, String queue) {
     
     synchronized (this) {
       removeApplication(application, application.getUser());
@@ -453,7 +456,7 @@ public class ParentQueue implements Queu
     }
   }
 
-  public synchronized void removeApplication(CSApp application, 
+  public synchronized void removeApplication(SchedulerApp application, 
       String user) {
     
     --numApplications;
@@ -475,7 +478,7 @@ public class ParentQueue implements Queu
 
   @Override
   public synchronized Resource assignContainers(
-      Resource clusterResource, CSNode node) {
+      Resource clusterResource, SchedulerNode node) {
     Resource assigned = Resources.createResource(0);
 
     while (canAssign(node)) {
@@ -539,14 +542,14 @@ public class ParentQueue implements Queu
 
   }
   
-  private boolean canAssign(CSNode node) {
-    return (node.getReservedApplication() == null) && 
+  private boolean canAssign(SchedulerNode node) {
+    return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(node.getAvailableResource(), 
                                      minimumAllocation);
   }
   
   synchronized Resource assignContainersToChildQueues(Resource cluster, 
-      CSNode node) {
+      SchedulerNode node) {
     Resource assigned = Resources.createResource(0);
     
     printChildQueues();
@@ -588,13 +591,14 @@ public class ParentQueue implements Queu
   
   @Override
   public void completedContainer(Resource clusterResource,
-      Container container, Resource containerResource, 
-      CSApp application) {
+      SchedulerApp application, SchedulerNode node, 
+      RMContainer rmContainer, RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
       synchronized (this) {
-        releaseResource(clusterResource, containerResource);
+        releaseResource(clusterResource, 
+            rmContainer.getContainer().getResource());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -605,8 +609,8 @@ public class ParentQueue implements Queu
 
       // Inform the parent
       if (parent != null) {
-        parent.completedContainer(clusterResource, container, 
-            containerResource, application);
+        parent.completedContainer(clusterResource, application, 
+            node, rmContainer, event);
       }    
     }
   }
@@ -646,7 +650,7 @@ public class ParentQueue implements Queu
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      CSApp application, Container container) {
+      SchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, container.getResource());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug  3 11:51:20 2011
@@ -26,12 +26,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 /**
  * Queue represents a node in the tree of 
@@ -138,7 +139,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(CSApp application, String user, 
+  public void submitApplication(SchedulerApp application, String user, 
       String queue) 
   throws AccessControlException;
   
@@ -147,7 +148,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param application
    * @param queue application queue 
    */
-  public void finishApplication(CSApp application, String queue);
+  public void finishApplication(SchedulerApp application, String queue);
   
   /**
    * Assign containers to applications in the queue or it's children (if any).
@@ -155,19 +156,20 @@ extends org.apache.hadoop.yarn.server.re
    * @param node node on which resources are available
    * @return
    */
-  public Resource assignContainers(Resource clusterResource, CSNode node);
+  public Resource assignContainers(Resource clusterResource, SchedulerNode node);
   
   /**
    * A container assigned to the queue has completed.
    * @param clusterResource the resource of the cluster
+   * @param application application to which the container was assigned
+   * @param node node on which the container completed
    * @param container completed container, 
    *                  <code>null</code> if it was just a reservation
-   * @param containerResource allocated resource
-   * @param application application to which the container was assigned
+   * @param event event to be sent to the container
    */
   public void completedContainer(Resource clusterResource,
-      Container container, Resource containerResource, 
-      CSApp application);
+      SchedulerApp application, SchedulerNode node, 
+      RMContainer container, RMContainerEventType event);
 
   /**
    * Get the number of applications in the queue.
@@ -196,6 +198,6 @@ extends org.apache.hadoop.yarn.server.re
    * @param application the application for which the container was allocated
    * @param container the container that was recovered.
    */
-  public void recoverContainer(Resource clusterResource, CSApp application, 
+  public void recoverContainer(Resource clusterResource, SchedulerApp application, 
       Container container);
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug  3 11:51:20 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -225,7 +225,8 @@ public class FifoScheduler implements Re
 
     // Release containers
     for (Container releasedContainer : release) {
-      containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
+      containerCompleted(getRMContainer(releasedContainer), 
+          RMContainerEventType.RELEASED);
     }
 
     if (!ask.isEmpty()) {
@@ -261,8 +262,9 @@ public class FifoScheduler implements Re
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.getCapability().getMemory();
     // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
-    memory = MINIMUM_MEMORY *
-    ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+    memory = 
+        MINIMUM_MEMORY * 
+        ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
     ask.setCapability(Resources.createResource(memory));
   }
 
@@ -279,12 +281,12 @@ public class FifoScheduler implements Re
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         appAttemptId, queueName, user, null);
-    SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo,
-        DEFAULT_QUEUE);
+    SchedulerApp schedulerApp = 
+        new SchedulerApp(this.rmContext, appSchedulingInfo, DEFAULT_QUEUE);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
-    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + 
-        ", currently active: " + applications.size());
+    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
+        " from " + user + ", currently active: " + applications.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
             RMAppAttemptEventType.APP_ACCEPTED));
@@ -302,7 +304,7 @@ public class FifoScheduler implements Re
 
     // Kill all 'live' containers
     for (RMContainer container : application.getLiveContainers()) {
-      containerCompleted(container.getContainer(), RMContainerEventType.KILL);
+      containerCompleted(container, RMContainerEventType.KILL);
     }
 
     // Clean up pending requests, metrics etc.
@@ -428,7 +430,7 @@ public class FifoScheduler implements Re
                 NodeType.DATA_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             assignableContainers, request, NodeType.DATA_LOCAL);
     }
     return assignedContainers;
@@ -446,7 +448,7 @@ public class FifoScheduler implements Re
                 NodeType.RACK_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             assignableContainers, request, NodeType.RACK_LOCAL);
     }
     return assignedContainers;
@@ -459,13 +461,13 @@ public class FifoScheduler implements Re
       application.getResourceRequest(priority, SchedulerNode.ANY);
     if (request != null) {
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             request.getNumContainers(), request, NodeType.OFF_SWITCH);
     }
     return assignedContainers;
   }
 
-  private int assignContainers(SchedulerNode node, SchedulerApp application, 
+  private int assignContainer(SchedulerNode node, SchedulerApp application, 
       Priority priority, int assignableContainers, 
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
@@ -495,10 +497,6 @@ public class FifoScheduler implements Re
                 application.getNewContainerId(),
                 node.getRMNode().getNodeID(),
                 node.getRMNode().getHttpAddress(), capability);
-        RMContainer rmContainer = new RMContainerImpl(container, application
-            .getApplicationAttemptId(), node.getNodeID(), this.rmContext
-            .getDispatcher().getEventHandler(), this.rmContext
-            .getContainerAllocationExpirer());
         
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
@@ -518,10 +516,14 @@ public class FifoScheduler implements Re
         }
         
         // Allocate!
-        application.allocate(type, node, priority, request, 
-            Collections.singletonList(rmContainer));
+        
+        // Inform the application
+        RMContainer rmContainer =
+            application.allocate(type, node, priority, request, container);
+        
+        // Inform the node
         node.allocateContainer(application.getApplicationId(), 
-            container);
+            rmContainer);
       }
       
       // Update total usage
@@ -541,7 +543,8 @@ public class FifoScheduler implements Re
         if (container.getState() == ContainerState.RUNNING) {
           containerLaunchedOnNode(container, node);
         } else { // has to COMPLETE
-          containerCompleted(container, RMContainerEventType.FINISHED);
+          containerCompleted(getRMContainer(container), 
+              RMContainerEventType.FINISHED);
         }
       }
     }
@@ -607,7 +610,7 @@ public class FifoScheduler implements Re
     {
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
-      containerCompleted(containerExpiredEvent.getContainer(), 
+      containerCompleted(getRMContainer(containerExpiredEvent.getContainer()), 
           RMContainerEventType.EXPIRE);
     }
     break;
@@ -631,9 +634,10 @@ public class FifoScheduler implements Re
   }
 
   @Lock(FifoScheduler.class)
-  private synchronized void containerCompleted(Container container,
+  private synchronized void containerCompleted(RMContainer rmContainer,
       RMContainerEventType event) {
     // Get the application for the finished container
+    Container container = rmContainer.getContainer();
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
     
@@ -649,7 +653,7 @@ public class FifoScheduler implements Re
     }
 
     // Inform the application
-    application.containerCompleted(container, event);
+    application.containerCompleted(rmContainer, event);
 
     // Inform the node
     node.releaseContainer(container);
@@ -667,7 +671,7 @@ public class FifoScheduler implements Re
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
     // Kill running containers
-    for(Container container : node.getRunningContainers()) {
+    for(RMContainer container : node.getRunningContainers()) {
       containerCompleted(container, RMContainerEventType.KILL);
     }
     
@@ -696,6 +700,7 @@ public class FifoScheduler implements Re
 
   @Override
   public void recover(RMState state) {
+    // TODO fix recovery
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
 //      ApplicationId appId = entry.getKey();
 //      ApplicationInfo appInfo = entry.getValue();
@@ -710,4 +715,12 @@ public class FifoScheduler implements Re
     return new SchedulerNodeReport(
         node.getUsedResource(), node.getNumContainers());
   }
+  
+  private RMContainer getRMContainer(Container container) {
+    ContainerId containerId = container.getId();
+    SchedulerApp application = 
+        getApplication(container.getId().getAppAttemptId());
+    return application.getRMContainer(containerId);
+  }
+
 }



Mime
View raw message