Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E18137D92 for ; Wed, 3 Aug 2011 11:51:55 +0000 (UTC) Received: (qmail 1782 invoked by uid 500); 3 Aug 2011 11:51:55 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 1694 invoked by uid 500); 3 Aug 2011 11:51:53 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 1686 invoked by uid 99); 3 Aug 2011 11:51:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 11:51:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 11:51:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0155B238896F; Wed, 3 Aug 2011 11:51:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1153453 [2/2] - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager: rmcontainer/ scheduler/ scheduler/capacity/ scheduler/fifo/ Date: Wed, 03 Aug 2011 11:51:22 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110803115123.0155B238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 3 11:51:20 2011 @@ -36,9 +36,8 @@ import org.apache.hadoop.classification. import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -47,20 +46,18 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -85,8 +82,10 @@ public class LeafQueue implements Queue private float usedCapacity = 0.0f; private volatile int numContainers; - Set applications; - + Set applications; + Map applicationsMap = + new HashMap(); + public final Resource minimumAllocation; private ContainerTokenSecretManager containerTokenSecretManager; @@ -109,7 +108,7 @@ public class LeafQueue implements Queue public LeafQueue(CapacitySchedulerContext cs, String queueName, Queue parent, - Comparator applicationComparator, Queue old) { + Comparator applicationComparator, Queue old) { this.scheduler = cs; this.queueName = queueName; this.parent = parent; @@ -158,7 +157,7 @@ public class LeafQueue implements Queue " name=" + queueName + ", fullname=" + getQueuePath()); - this.applications = new TreeSet(applicationComparator); + this.applications = new TreeSet(applicationComparator); } private synchronized void setupQueueConfigs( @@ -362,7 +361,7 @@ public class LeafQueue implements Queue } @Override - public void submitApplication(CSApp application, String userName, + public void submitApplication(SchedulerApp application, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! @@ -423,10 +422,11 @@ public class LeafQueue implements Queue } } - private synchronized void addApplication(CSApp application, User user) { + private synchronized void addApplication(SchedulerApp application, User user) { // Accept user.submitApplication(); applications.add(application); + applicationsMap.put(application.getApplicationAttemptId(), application); LOG.info("Application added -" + " appId: " + application.getApplicationId() + @@ -436,7 +436,7 @@ public class LeafQueue implements Queue } @Override - public void finishApplication(CSApp application, String queue) { + public void finishApplication(SchedulerApp application, String queue) { // Careful! Locking order is important! synchronized (this) { removeApplication(application, getUser(application.getUser())); @@ -446,8 +446,9 @@ public class LeafQueue implements Queue parent.finishApplication(application, queue); } - public synchronized void removeApplication(CSApp application, User user) { + public synchronized void removeApplication(SchedulerApp application, User user) { applications.remove(application); + applicationsMap.remove(application.getApplicationAttemptId()); user.finishApplication(); if (user.getApplications() == 0) { @@ -461,24 +462,31 @@ public class LeafQueue implements Queue " #user-applications: " + user.getApplications() + " #queue-applications: " + getNumApplications()); } + + private synchronized SchedulerApp getApplication( + ApplicationAttemptId applicationAttemptId) { + return applicationsMap.get(applicationAttemptId); + } @Override public synchronized Resource - assignContainers(Resource clusterResource, CSNode node) { + assignContainers(Resource clusterResource, SchedulerNode node) { LOG.info("DEBUG --- assignContainers:" + " node=" + node.getNodeAddress() + " #applications=" + applications.size()); // Check for reserved resources - CSApp reservedApplication = node.getReservedApplication(); - if (reservedApplication != null) { - return assignReservedContainers(reservedApplication, node, + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + SchedulerApp application = + getApplication(reservedContainer.getApplicationAttemptId()); + return assignReservedContainer(application, node, reservedContainer, clusterResource); } - - // Try to assign containers to applications in fifo order - for (CSApp application : applications) { + + // Try to assign containers to applications in order + for (SchedulerApp application : applications) { LOG.info("DEBUG --- pre-assignContainers for application " + application.getApplicationId()); @@ -497,6 +505,7 @@ public class LeafQueue implements Queue } // Are we going over limits by allocating to this application? + ResourceRequest required = application.getResourceRequest(priority, RMNode.ANY); @@ -520,7 +529,7 @@ public class LeafQueue implements Queue // Try to schedule Resource assigned = assignContainersOnNode(clusterResource, node, application, priority, - false); + null); // Did we schedule or reserve a container? if (Resources.greaterThan(assigned, Resources.none())) { @@ -552,30 +561,21 @@ public class LeafQueue implements Queue } - private synchronized Resource assignReservedContainers(CSApp application, - CSNode node, Resource clusterResource) { - synchronized (application) { - for (Priority priority : application.getPriorities()) { - - // Do we reserve containers at this 'priority'? - if (application.isReserved(node, priority)) { - - // Do we really need this reservation still? - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, RMNode.ANY); - if (offSwitchRequest.getNumContainers() == 0) { - // Release - unreserve(application, priority, node); - return offSwitchRequest.getCapability(); - } - - // Try to assign if we have sufficient resources - assignContainersOnNode(clusterResource, node, application, priority, - true); - } - } + private synchronized Resource assignReservedContainer(SchedulerApp application, + SchedulerNode node, RMContainer rmContainer, Resource clusterResource) { + // Do we still need this reservation? + Priority priority = rmContainer.getReservedPriority(); + if (application.getTotalRequiredResources(priority) == 0) { + // Release + Container container = rmContainer.getContainer(); + completedContainer(clusterResource, application, node, + rmContainer, RMContainerEventType.RELEASED); + return container.getResource(); } + // Try to assign if we have sufficient resources + assignContainersOnNode(clusterResource, node, application, priority, rmContainer); + // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE; @@ -599,12 +599,12 @@ public class LeafQueue implements Queue return true; } - private void setUserResourceLimit(CSApp application, Resource resourceLimit) { + private void setUserResourceLimit(SchedulerApp application, Resource resourceLimit) { application.setAvailableResourceLimit(resourceLimit); metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit); } - private Resource computeUserLimit(CSApp application, + private Resource computeUserLimit(SchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if @@ -688,80 +688,87 @@ public class LeafQueue implements Queue return (a + (b - 1)) / b; } - boolean needContainers(CSApp application, Priority priority) { - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, RMNode.ANY); - - int requiredContainers = offSwitchRequest.getNumContainers(); - int reservedContainers = application.getReservedContainers(priority); + boolean needContainers(SchedulerApp application, Priority priority) { + int requiredContainers = application.getTotalRequiredResources(priority); + int reservedContainers = application.getNumReservedContainers(priority); return ((requiredContainers - reservedContainers) > 0); } - Resource assignContainersOnNode(Resource clusterResource, CSNode node, - CSApp application, Priority priority, boolean reserved) { + Resource assignContainersOnNode(Resource clusterResource, SchedulerNode node, + SchedulerApp application, Priority priority, RMContainer reservedContainer) { Resource assigned = Resources.none(); // Data-local - assigned = assignNodeLocalContainers(clusterResource, node, application, priority); + assigned = + assignNodeLocalContainers(clusterResource, node, application, priority, + reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { return assigned; } // Rack-local - assigned = assignRackLocalContainers(clusterResource, node, application, priority); + assigned = + assignRackLocalContainers(clusterResource, node, application, priority, + reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { - return assigned; + return assigned; } // Off-switch return assignOffSwitchContainers(clusterResource, node, application, - priority, reserved); + priority, reservedContainer); } - Resource assignNodeLocalContainers(Resource clusterResource, CSNode node, - CSApp application, Priority priority) { + Resource assignNodeLocalContainers(Resource clusterResource, SchedulerNode node, + SchedulerApp application, Priority priority, + RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node .getNodeAddress()); if (request != null) { - if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) { + if (canAssign(application, priority, node, NodeType.DATA_LOCAL, + reservedContainer)) { return assignContainer(clusterResource, node, application, priority, request, - NodeType.DATA_LOCAL); + NodeType.DATA_LOCAL, reservedContainer); } } return Resources.none(); } - Resource assignRackLocalContainers(Resource clusterResource, CSNode node, - CSApp application, Priority priority) { + Resource assignRackLocalContainers(Resource clusterResource, + SchedulerNode node, SchedulerApp application, Priority priority, + RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node.getRackName()); if (request != null) { - if (canAssign(application, priority, node, NodeType.RACK_LOCAL, false)) { + if (canAssign(application, priority, node, NodeType.RACK_LOCAL, + reservedContainer)) { return assignContainer(clusterResource, node, application, priority, request, - NodeType.RACK_LOCAL); + NodeType.RACK_LOCAL, reservedContainer); } } return Resources.none(); } - Resource assignOffSwitchContainers(Resource clusterResource, CSNode node, - CSApp application, Priority priority, boolean reserved) { + Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, + SchedulerApp application, Priority priority, + RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, RMNode.ANY); if (request != null) { - if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) { + if (canAssign(application, priority, node, NodeType.OFF_SWITCH, + reservedContainer)) { return assignContainer(clusterResource, node, application, priority, request, - NodeType.OFF_SWITCH); + NodeType.OFF_SWITCH, reservedContainer); } } return Resources.none(); } - boolean canAssign(CSApp application, Priority priority, - CSNode node, NodeType type, boolean reserved) { + boolean canAssign(SchedulerApp application, Priority priority, + SchedulerNode node, NodeType type, RMContainer reservedContainer) { ResourceRequest offSwitchRequest = application.getResourceRequest(priority, RMNode.ANY); @@ -781,18 +788,18 @@ public class LeafQueue implements Queue if (requiredContainers > 0) { // No 'delay' for reserved containers - if (reserved) { + if (reservedContainer != null) { return true; } -// // Check if we have waited long enough -// if (missedNodes < (requiredContainers * localityWaitFactor)) { -// LOG.info("Application " + application.getApplicationId() + -// " has missed " + missedNodes + " opportunities," + -// " waitFactor= " + localityWaitFactor + -// " for cluster of size " + scheduler.getNumClusterNodes()); -// return false; -// } + // Check if we have waited long enough + if (missedNodes < (requiredContainers * localityWaitFactor)) { + LOG.info("Application " + application.getApplicationId() + + " has missed " + missedNodes + " opportunities," + + " waitFactor= " + localityWaitFactor + + " for cluster of size " + scheduler.getNumClusterNodes()); + return false; + } return true; } return false; @@ -830,157 +837,162 @@ public class LeafQueue implements Queue return false; } - private Resource assignContainer(Resource clusterResource, CSNode node, - CSApp application, - Priority priority, ResourceRequest request, NodeType type) { + + private Container getContainer(RMContainer rmContainer, + SchedulerApp application, SchedulerNode node, Resource capability) { + if (rmContainer != null) { + return rmContainer.getContainer(); + } + + Container container = + BuilderUtils.newContainer(this.recordFactory, + application.getApplicationAttemptId(), + application.getNewContainerId(), + node.getNodeID(), + node.getHttpAddress(), capability); + + // If security is enabled, send the container-tokens too. + if (UserGroupInformation.isSecurityEnabled()) { + ContainerToken containerToken = + this.recordFactory.newRecordInstance(ContainerToken.class); + ContainerTokenIdentifier tokenidentifier = + new ContainerTokenIdentifier(container.getId(), + container.getNodeId().toString(), container.getResource()); + containerToken.setIdentifier( + ByteBuffer.wrap(tokenidentifier.getBytes())); + containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); + containerToken.setPassword( + ByteBuffer.wrap( + containerTokenSecretManager.createPassword(tokenidentifier)) + ); + containerToken.setService(container.getNodeId().toString()); + container.setContainerToken(containerToken); + } + + return container; + } + + private Resource assignContainer(Resource clusterResource, SchedulerNode node, + SchedulerApp application, Priority priority, + ResourceRequest request, NodeType type, RMContainer rmContainer) { LOG.info("DEBUG --- assignContainers:" + " node=" + node.getNodeAddress() + " application=" + application.getApplicationId().getId() + " priority=" + priority.getPriority() + " request=" + request + " type=" + type); Resource capability = request.getCapability(); - - Resource available = node.getAvailableResource(); - if (available.getMemory() > 0) { - - int availableContainers = - available.getMemory() / capability.getMemory(); // TODO: A buggy - // application - // with this - // zero would - // crash the - // scheduler. - - if (availableContainers > 0) { - List containers = - new ArrayList(); - Container container = - BuilderUtils.newContainer(this.recordFactory, - application.getApplicationAttemptId(), - application.getNewContainerId(), - node.getNodeID(), - node.getHttpAddress(), capability); - - // If security is enabled, send the container-tokens too. - if (UserGroupInformation.isSecurityEnabled()) { - ContainerToken containerToken = this.recordFactory.newRecordInstance(ContainerToken.class); - ContainerTokenIdentifier tokenidentifier = - new ContainerTokenIdentifier(container.getId(), - container.getNodeId().toString(), container.getResource()); - containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes())); - containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); - containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager - .createPassword(tokenidentifier))); - containerToken.setService(container.getNodeId().toString()); - container.setContainerToken(containerToken); - } - - containers.add(container); - - // Allocate - allocate(application, type, priority, request, node, containers); + Resource available = node.getAvailableResource(); - // Did we previously reserve containers at this 'priority'? - if (application.isReserved(node, priority)){ - unreserve(application, priority, node); - } - - LOG.info("assignedContainer" + - " application=" + application.getApplicationId() + - " container=" + container + - " queue=" + this.toString() + - " util=" + getUtilization() + - " used=" + usedResources + - " cluster=" + clusterResource); + assert (available.getMemory() > 0); - return container.getResource(); - } else { - // Reserve by 'charging' in advance... - reserve(application, priority, node, request.getCapability()); - - LOG.info("Reserved container " + - " application=" + application.getApplicationId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " util=" + getUtilization() + - " used=" + usedResources + - " cluster=" + clusterResource); + // Create the container if necessary + Container container = + getContainer(rmContainer, application, node, capability); + + // Can we allocate a container on this node? + int availableContainers = + available.getMemory() / capability.getMemory(); + if (availableContainers > 0) { + // Allocate... - return request.getCapability(); + // Did we previously reserve containers at this 'priority'? + if (rmContainer != null){ + unreserve(application, priority, node, rmContainer); + } + // Inform the application + RMContainer allocatedContainer = + application.allocate(type, node, priority, request, container); + if (allocatedContainer == null) { + // Did the application need this resource? + return Resources.none(); } - } - return Resources.none(); + // Inform the node + node.allocateContainer(application.getApplicationId(), + allocatedContainer); + + LOG.info("assignedContainer" + + " application=" + application.getApplicationId() + + " container=" + container + + " containerId=" + container.getId() + + " queue=" + this + + " util=" + getUtilization() + + " used=" + usedResources + + " cluster=" + clusterResource); + + return container.getResource(); + } else { + // Reserve by 'charging' in advance... + reserve(application, priority, node, rmContainer, container); + + LOG.info("Reserved container " + + " application=" + application.getApplicationId() + + " resource=" + request.getCapability() + + " queue=" + this.toString() + + " util=" + getUtilization() + + " used=" + usedResources + + " cluster=" + clusterResource); + + return request.getCapability(); + } } - private void allocate(CSApp application, NodeType type, - Priority priority, ResourceRequest request, - CSNode node, List containers) { - // Allocate container to the application - // TODO: acm: refactor2 FIXME - application.allocate(type, node, priority, request, null); - - for (Container container : containers) { - // Create the container and 'start' it. - ContainerId containerId = container.getId(); - RMContext rmContext = this.scheduler.getRMContext(); - EventHandler eventHandler = rmContext.getDispatcher().getEventHandler(); - RMContainer rmContainer = new RMContainerImpl(container, application - .getApplicationAttemptId(), node.getNodeID(), - eventHandler, rmContext.getContainerAllocationExpirer()); - // TODO: FIX -// if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) { -// LOG.error("Duplicate container addition! ContainerID : " -// + containerId); -// } else { -// eventHandler.handle(new RMContainerEvent(containerId, -// RMContainerEventType.START)); -// } - } - - // Inform the NodeManager about the allocation - // TODO: acm: refactor2 FIXME -// node.allocateContainer(application.getApplicationId(), -// containers); - } - - private void reserve(CSApp application, Priority priority, - CSNode node, Resource resource) { - application.reserveResource(node, priority, resource); - node.reserveResource(application, priority, resource); + private void reserve(SchedulerApp application, Priority priority, + SchedulerNode node, RMContainer rmContainer, Container container) { + rmContainer = application.reserve(node, priority, rmContainer, container); + node.reserveResource(application, priority, rmContainer); + + // Update reserved metrics if this is the first reservation + if (rmContainer == null) { + getMetrics().reserveResource( + application.getUser(), container.getResource()); + } } - private void unreserve(CSApp application, Priority priority, - CSNode node) { + private void unreserve(SchedulerApp application, Priority priority, + SchedulerNode node, RMContainer rmContainer) { // Done with the reservation? - if (application.isReserved(node, priority)) { - application.unreserveResource(node, priority); - node.unreserveResource(application, priority); - } + application.unreserve(node, priority); + node.unreserveResource(application); + + // Update reserved metrics + getMetrics().unreserveResource( + application.getUser(), rmContainer.getContainer().getResource()); } @Override public void completedContainer(Resource clusterResource, - Container container, Resource containerResource, CSApp application) { + SchedulerApp application, SchedulerNode node, RMContainer rmContainer, + RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! synchronized (this) { + + Container container = rmContainer.getContainer(); - // Inform the application - this might be an allocated container or - // an unfulfilled reservation - // TODO: acm: refactor2 FIXME - //application.completedContainer(container, containerResource); - + // Inform the application & the node + // Note: It's safe to assume that all state changes to RMContainer + // happen under scheduler's lock... + // So, this is, in effect, a transaction across application & node + if (rmContainer.getState() == RMContainerState.RESERVED) { + application.unreserve(node, rmContainer.getReservedPriority()); + node.unreserveResource(application); + } else { + application.containerCompleted(rmContainer, event); + node.releaseContainer(container); + } + + // Book-keeping releaseResource(clusterResource, - application.getUser(), containerResource); + application.getUser(), container.getResource()); LOG.info("completedContainer" + " container=" + container + - " resource=" + containerResource + + " resource=" + container.getResource() + " queue=" + this + " util=" + getUtilization() + " used=" + usedResources + @@ -988,29 +1000,41 @@ public class LeafQueue implements Queue } // Inform the parent queue - parent.completedContainer(clusterResource, container, - containerResource, application); + parent.completedContainer(clusterResource, application, + node, rmContainer, event); } } private synchronized void allocateResource(Resource clusterResource, String userName, Resource resource) { + // Update queue metrics Resources.addTo(usedResources, resource); updateResource(clusterResource); ++numContainers; + // Update user metrics User user = getUser(userName); user.assignContainer(resource); + + LOG.info(getQueueName() + + " used=" + usedResources + " numContainers=" + numContainers + + " user=" + userName + " resources=" + user.getConsumedResources()); } private synchronized void releaseResource(Resource clusterResource, String userName, Resource resource) { + // Update queue metrics Resources.subtractFrom(usedResources, resource); updateResource(clusterResource); --numContainers; + // Update user metrics User user = getUser(userName); user.releaseContainer(resource); + + LOG.info(getQueueName() + + " used=" + usedResources + " numContainers=" + numContainers + + " user=" + userName + " resources=" + user.getConsumedResources()); } @Override @@ -1062,7 +1086,7 @@ public class LeafQueue implements Queue @Override public void recoverContainer(Resource clusterResource, - CSApp application, Container container) { + SchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, application.getUser(), container.getResource()); Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug 3 11:51:20 2011 @@ -36,7 +36,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -46,7 +45,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @Private @Evolving @@ -396,7 +399,7 @@ public class ParentQueue implements Queu } @Override - public void submitApplication(CSApp application, String user, + public void submitApplication(SchedulerApp application, String user, String queue) throws AccessControlException { synchronized (this) { @@ -428,7 +431,7 @@ public class ParentQueue implements Queu } } - private synchronized void addApplication(CSApp application, + private synchronized void addApplication(SchedulerApp application, String user) { ++numApplications; @@ -441,7 +444,7 @@ public class ParentQueue implements Queu } @Override - public void finishApplication(CSApp application, String queue) { + public void finishApplication(SchedulerApp application, String queue) { synchronized (this) { removeApplication(application, application.getUser()); @@ -453,7 +456,7 @@ public class ParentQueue implements Queu } } - public synchronized void removeApplication(CSApp application, + public synchronized void removeApplication(SchedulerApp application, String user) { --numApplications; @@ -475,7 +478,7 @@ public class ParentQueue implements Queu @Override public synchronized Resource assignContainers( - Resource clusterResource, CSNode node) { + Resource clusterResource, SchedulerNode node) { Resource assigned = Resources.createResource(0); while (canAssign(node)) { @@ -539,14 +542,14 @@ public class ParentQueue implements Queu } - private boolean canAssign(CSNode node) { - return (node.getReservedApplication() == null) && + private boolean canAssign(SchedulerNode node) { + return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(node.getAvailableResource(), minimumAllocation); } synchronized Resource assignContainersToChildQueues(Resource cluster, - CSNode node) { + SchedulerNode node) { Resource assigned = Resources.createResource(0); printChildQueues(); @@ -588,13 +591,14 @@ public class ParentQueue implements Queu @Override public void completedContainer(Resource clusterResource, - Container container, Resource containerResource, - CSApp application) { + SchedulerApp application, SchedulerNode node, + RMContainer rmContainer, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! // Book keeping synchronized (this) { - releaseResource(clusterResource, containerResource); + releaseResource(clusterResource, + rmContainer.getContainer().getResource()); LOG.info("completedContainer" + " queue=" + getQueueName() + @@ -605,8 +609,8 @@ public class ParentQueue implements Queu // Inform the parent if (parent != null) { - parent.completedContainer(clusterResource, container, - containerResource, application); + parent.completedContainer(clusterResource, application, + node, rmContainer, event); } } } @@ -646,7 +650,7 @@ public class ParentQueue implements Queu @Override public void recoverContainer(Resource clusterResource, - CSApp application, Container container) { + SchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, container.getResource()); Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1153453&r1=1153452&r2=1153453&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug 3 11:51:20 2011 @@ -26,12 +26,13 @@ import org.apache.hadoop.classification. import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** * Queue represents a node in the tree of @@ -138,7 +139,7 @@ extends org.apache.hadoop.yarn.server.re * @param user user who submitted the application * @param queue queue to which the application is submitted */ - public void submitApplication(CSApp application, String user, + public void submitApplication(SchedulerApp application, String user, String queue) throws AccessControlException; @@ -147,7 +148,7 @@ extends org.apache.hadoop.yarn.server.re * @param application * @param queue application queue */ - public void finishApplication(CSApp application, String queue); + public void finishApplication(SchedulerApp application, String queue); /** * Assign containers to applications in the queue or it's children (if any). @@ -155,19 +156,20 @@ extends org.apache.hadoop.yarn.server.re * @param node node on which resources are available * @return */ - public Resource assignContainers(Resource clusterResource, CSNode node); + public Resource assignContainers(Resource clusterResource, SchedulerNode node); /** * A container assigned to the queue has completed. * @param clusterResource the resource of the cluster + * @param application application to which the container was assigned + * @param node node on which the container completed * @param container completed container, * null if it was just a reservation - * @param containerResource allocated resource - * @param application application to which the container was assigned + * @param event event to be sent to the container */ public void completedContainer(Resource clusterResource, - Container container, Resource containerResource, - CSApp application); + SchedulerApp application, SchedulerNode node, + RMContainer container, RMContainerEventType event); /** * Get the number of applications in the queue. @@ -196,6 +198,6 @@ extends org.apache.hadoop.yarn.server.re * @param application the application for which the container was allocated * @param container the container that was recovered. */ - public void recoverContainer(Resource clusterResource, CSApp application, + public void recoverContainer(Resource clusterResource, SchedulerApp application, Container container); } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153453&r1=1153452&r2=1153453&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 3 11:51:20 2011 @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.Lock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.NodeId; @@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -225,7 +225,8 @@ public class FifoScheduler implements Re // Release containers for (Container releasedContainer : release) { - containerCompleted(releasedContainer, RMContainerEventType.RELEASED); + containerCompleted(getRMContainer(releasedContainer), + RMContainerEventType.RELEASED); } if (!ask.isEmpty()) { @@ -261,8 +262,9 @@ public class FifoScheduler implements Re private void normalizeRequest(ResourceRequest ask) { int memory = ask.getCapability().getMemory(); // FIXME: TestApplicationCleanup is relying on unnormalized behavior. - memory = MINIMUM_MEMORY * - ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0)); + memory = + MINIMUM_MEMORY * + ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0)); ask.setCapability(Resources.createResource(memory)); } @@ -279,12 +281,12 @@ public class FifoScheduler implements Re String queueName, String user) { AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( appAttemptId, queueName, user, null); - SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo, - DEFAULT_QUEUE); + SchedulerApp schedulerApp = + new SchedulerApp(this.rmContext, appSchedulingInfo, DEFAULT_QUEUE); applications.put(appAttemptId, schedulerApp); metrics.submitApp(user); - LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + - ", currently active: " + applications.size()); + LOG.info("Application Submission: " + appAttemptId.getApplicationId() + + " from " + user + ", currently active: " + applications.size()); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.APP_ACCEPTED)); @@ -302,7 +304,7 @@ public class FifoScheduler implements Re // Kill all 'live' containers for (RMContainer container : application.getLiveContainers()) { - containerCompleted(container.getContainer(), RMContainerEventType.KILL); + containerCompleted(container, RMContainerEventType.KILL); } // Clean up pending requests, metrics etc. @@ -428,7 +430,7 @@ public class FifoScheduler implements Re NodeType.DATA_LOCAL), request.getNumContainers()); assignedContainers = - assignContainers(node, application, priority, + assignContainer(node, application, priority, assignableContainers, request, NodeType.DATA_LOCAL); } return assignedContainers; @@ -446,7 +448,7 @@ public class FifoScheduler implements Re NodeType.RACK_LOCAL), request.getNumContainers()); assignedContainers = - assignContainers(node, application, priority, + assignContainer(node, application, priority, assignableContainers, request, NodeType.RACK_LOCAL); } return assignedContainers; @@ -459,13 +461,13 @@ public class FifoScheduler implements Re application.getResourceRequest(priority, SchedulerNode.ANY); if (request != null) { assignedContainers = - assignContainers(node, application, priority, + assignContainer(node, application, priority, request.getNumContainers(), request, NodeType.OFF_SWITCH); } return assignedContainers; } - private int assignContainers(SchedulerNode node, SchedulerApp application, + private int assignContainer(SchedulerNode node, SchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + @@ -495,10 +497,6 @@ public class FifoScheduler implements Re application.getNewContainerId(), node.getRMNode().getNodeID(), node.getRMNode().getHttpAddress(), capability); - RMContainer rmContainer = new RMContainerImpl(container, application - .getApplicationAttemptId(), node.getNodeID(), this.rmContext - .getDispatcher().getEventHandler(), this.rmContext - .getContainerAllocationExpirer()); // If security is enabled, send the container-tokens too. if (UserGroupInformation.isSecurityEnabled()) { @@ -518,10 +516,14 @@ public class FifoScheduler implements Re } // Allocate! - application.allocate(type, node, priority, request, - Collections.singletonList(rmContainer)); + + // Inform the application + RMContainer rmContainer = + application.allocate(type, node, priority, request, container); + + // Inform the node node.allocateContainer(application.getApplicationId(), - container); + rmContainer); } // Update total usage @@ -541,7 +543,8 @@ public class FifoScheduler implements Re if (container.getState() == ContainerState.RUNNING) { containerLaunchedOnNode(container, node); } else { // has to COMPLETE - containerCompleted(container, RMContainerEventType.FINISHED); + containerCompleted(getRMContainer(container), + RMContainerEventType.FINISHED); } } } @@ -607,7 +610,7 @@ public class FifoScheduler implements Re { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; - containerCompleted(containerExpiredEvent.getContainer(), + containerCompleted(getRMContainer(containerExpiredEvent.getContainer()), RMContainerEventType.EXPIRE); } break; @@ -631,9 +634,10 @@ public class FifoScheduler implements Re } @Lock(FifoScheduler.class) - private synchronized void containerCompleted(Container container, + private synchronized void containerCompleted(RMContainer rmContainer, RMContainerEventType event) { // Get the application for the finished container + Container container = rmContainer.getContainer(); ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId(); SchedulerApp application = getApplication(applicationAttemptId); @@ -649,7 +653,7 @@ public class FifoScheduler implements Re } // Inform the application - application.containerCompleted(container, event); + application.containerCompleted(rmContainer, event); // Inform the node node.releaseContainer(container); @@ -667,7 +671,7 @@ public class FifoScheduler implements Re private synchronized void removeNode(RMNode nodeInfo) { SchedulerNode node = getNode(nodeInfo.getNodeID()); // Kill running containers - for(Container container : node.getRunningContainers()) { + for(RMContainer container : node.getRunningContainers()) { containerCompleted(container, RMContainerEventType.KILL); } @@ -696,6 +700,7 @@ public class FifoScheduler implements Re @Override public void recover(RMState state) { + // TODO fix recovery // for (Map.Entry entry: state.getStoredApplications().entrySet()) { // ApplicationId appId = entry.getKey(); // ApplicationInfo appInfo = entry.getValue(); @@ -710,4 +715,12 @@ public class FifoScheduler implements Re return new SchedulerNodeReport( node.getUsedResource(), node.getNumContainers()); } + + private RMContainer getRMContainer(Container container) { + ContainerId containerId = container.getId(); + SchedulerApp application = + getApplication(container.getId().getAppAttemptId()); + return application.getRMContainer(containerId); + } + }