Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7D5BD38B for ; Tue, 17 Jul 2012 01:43:37 +0000 (UTC) Received: (qmail 83209 invoked by uid 500); 17 Jul 2012 01:43:37 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 83153 invoked by uid 500); 17 Jul 2012 01:43:37 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 83145 invoked by uid 99); 17 Jul 2012 01:43:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 01:43:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_OPPORTUN1 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 01:43:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 560F923888E4; Tue, 17 Jul 2012 01:43:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1362332 [1/3] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop... Date: Tue, 17 Jul 2012 01:43:04 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120717014306.560F923888E4@eris.apache.org> Author: acmurthy Date: Tue Jul 17 01:43:03 2012 New Revision: 1362332 URL: http://svn.apache.org/viewvc?rev=1362332&view=rev Log: MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal interface to allow schedulers to maintain their own. Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Removed: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jul 17 01:43:03 2012 @@ -132,12 +132,25 @@ Branch-2 ( Unreleased changes ) NEW FEATURES + IMPROVEMENTS + + BUG FIXES + +Release 2.1.0-alpha - Unreleased + + INCOMPATIBLE CHANGES + + NEW FEATURES + MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu) IMPROVEMENTS + MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal + interface to allow schedulers to maintain their own. (acmurthy) + MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. (Ahmed Radwan via tomwhite) Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Jul 17 01:43:03 2012 @@ -192,7 +192,8 @@ public class RMAppImpl implements RMApp BuilderUtils.newApplicationResourceUsageReport(-1, -1, Resources.createResource(-1), Resources.createResource(-1), Resources.createResource(-1)); - + private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1; + public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, String clientTokenStr, @@ -383,6 +384,7 @@ public class RMAppImpl implements RMApp this.readLock.lock(); try { + ApplicationAttemptId currentApplicationAttemptId = null; String clientToken = UNAVAILABLE; String trackingUrl = UNAVAILABLE; String host = UNAVAILABLE; @@ -393,19 +395,27 @@ public class RMAppImpl implements RMApp String diags = UNAVAILABLE; if (allowAccess) { if (this.currentAttempt != null) { + currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); clientToken = this.currentAttempt.getClientToken(); host = this.currentAttempt.getHost(); rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); + } else { + currentApplicationAttemptId = + BuilderUtils.newApplicationAttemptId(this.applicationId, + DUMMY_APPLICATION_ATTEMPT_NUMBER); } diags = this.diagnostics.toString(); } else { appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; + currentApplicationAttemptId = + BuilderUtils.newApplicationAttemptId(this.applicationId, + DUMMY_APPLICATION_ATTEMPT_NUMBER); } return BuilderUtils.newApplicationReport(this.applicationId, - this.currentAttempt.getAppAttemptId(), this.user, this.queue, + currentApplicationAttemptId, this.user, this.queue, this.name, host, rpcPort, clientToken, createApplicationState(this.stateMachine.getCurrentState()), diags, trackingUrl, this.startTime, this.finishTime, finishState, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Tue Jul 17 01:43:03 2012 @@ -56,7 +56,7 @@ public class ActiveUsersManager { * @param user application user * @param applicationId activated application */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public void activateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -79,7 +79,7 @@ public class ActiveUsersManager { * @param user application user * @param applicationId deactivated application */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public void deactivateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -102,7 +102,7 @@ public class ActiveUsersManager { * resource requests. * @return number of active users */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public int getNumActiveUsers() { return activeUsers; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Jul 17 01:43:03 2012 @@ -245,7 +245,8 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority, + synchronized private void allocateNodeLocal( + SchedulerNode node, Priority priority, ResourceRequest nodeLocalRequest, Container container) { // Update consumption and track allocations allocate(container); @@ -273,7 +274,8 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal(SchedulerNode node, Priority priority, + synchronized private void allocateRackLocal( + SchedulerNode node, Priority priority, ResourceRequest rackLocalRequest, Container container) { // Update consumption and track allocations @@ -295,7 +297,8 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority, + synchronized private void allocateOffSwitch( + SchedulerNode node, Priority priority, ResourceRequest offSwitchRequest, Container container) { // Update consumption and track allocations Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Tue Jul 17 01:43:03 2012 @@ -36,7 +36,7 @@ public class SchedulerAppReport { private final Collection reserved; private final boolean pending; - public SchedulerAppReport(SchedulerApp app) { + public SchedulerAppReport(SchedulerApplication app) { this.live = app.getLiveContainers(); this.reserved = app.getReservedContainers(); this.pending = app.isPending(); Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1362332&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Tue Jul 17 01:43:03 2012 @@ -0,0 +1,43 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Represents an Application from the viewpoint of the scheduler. + * Each running Application in the RM corresponds to one instance + * of this class. + */ +@Private +@Unstable +public abstract class SchedulerApplication { + + /** + * Get {@link ApplicationAttemptId} of the application master. + * @return ApplicationAttemptId of the application master + */ + public abstract ApplicationAttemptId getApplicationAttemptId(); + + /** + * Get the live containers of the application. + * @return live containers of the application + */ + public abstract Collection getLiveContainers(); + + /** + * Get the reserved containers of the application. + * @return the reserved containers of the application + */ + public abstract Collection getReservedContainers(); + + /** + * Is this application pending? + * @return true if it is else false. + */ + public abstract boolean isPending(); + +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Tue Jul 17 01:43:03 2012 @@ -18,224 +18,45 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -public class SchedulerNode { +/** + * Represents a YARN Cluster Node from the viewpoint of the scheduler. + */ +@Private +@Unstable +public abstract class SchedulerNode { - private static final Log LOG = LogFactory.getLog(SchedulerNode.class); - - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - - private volatile int numContainers; - - private RMContainer reservedContainer; + /** + * Get hostname. + * @return hostname + */ + public abstract String getHostName(); - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); + /** + * Get rackname. + * @return rackname + */ + public abstract String getRackName(); - private final RMNode rmNode; - - public static final String ANY = "*"; - - public SchedulerNode(RMNode node) { - this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); - } - - public RMNode getRMNode() { - return this.rmNode; - } - - public NodeId getNodeID() { - return this.rmNode.getNodeID(); - } - - public String getHttpAddress() { - return this.rmNode.getHttpAddress(); - } - - public String getHostName() { - return this.rmNode.getHostName(); - } - - public String getRackName() { - return this.rmNode.getRackName(); - } + /** + * Get used resources on the node. + * @return used resources on the node + */ + public abstract Resource getUsedResource(); /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container + * Get available resources on the node. + * @return available resources on the node */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - public synchronized Resource getAvailableResource() { - return this.availableResource; - } - - public synchronized Resource getUsedResource() { - return this.usedResource; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) - return true; - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - + public abstract Resource getAvailableResource(); + /** - * Release an allocated container on this node. - * @param container container to be released + * Get number of active containers on the node. + * @return number of active containers on the node */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); - } - - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - - public synchronized void reserveResource( - SchedulerApp application, Priority priority, - RMContainer reservedContainer) { - // Check if it's already reserved - if (this.reservedContainer != null) { - // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " for application " + application.getApplicationId() + - " when currently" + - " reserved container " + this.reservedContainer + - " on node " + this); - } - - LOG.info("Updated reserved container " + - reservedContainer.getContainer().getId() + " on node " + - this + " for application " + application); - } else { - LOG.info("Reserved container " + reservedContainer.getContainer().getId() + - " on node " + this + " for application " + application); - } - this.reservedContainer = reservedContainer; - } - - public synchronized void unreserveResource(SchedulerApp application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); - } - - reservedContainer = null; - } - - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } + public abstract int getNumContainers(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Tue Jul 17 01:43:03 2012 @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; /** * CSQueue represents a node in the tree of @@ -150,7 +150,7 @@ extends org.apache.hadoop.yarn.server.re * @param user user who submitted the application * @param queue queue to which the application is submitted */ - public void submitApplication(SchedulerApp application, String user, + public void submitApplication(FiCaSchedulerApp application, String user, String queue) throws AccessControlException; @@ -159,7 +159,7 @@ extends org.apache.hadoop.yarn.server.re * @param application * @param queue application queue */ - public void finishApplication(SchedulerApp application, String queue); + public void finishApplication(FiCaSchedulerApp application, String queue); /** * Assign containers to applications in the queue or it's children (if any). @@ -168,7 +168,7 @@ extends org.apache.hadoop.yarn.server.re * @return the assignment */ public CSAssignment assignContainers( - Resource clusterResource, SchedulerNode node); + Resource clusterResource, FiCaSchedulerNode node); /** * A container assigned to the queue has completed. @@ -182,7 +182,7 @@ extends org.apache.hadoop.yarn.server.re * @param event event to be sent to the container */ public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer container, ContainerStatus containerStatus, RMContainerEventType event); @@ -219,6 +219,6 @@ extends org.apache.hadoop.yarn.server.re * @param application the application for which the container was allocated * @param container the container that was recovered. */ - public void recoverContainer(Resource clusterResource, SchedulerApp application, + public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, Container container); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Jul 17 01:43:03 2012 @@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -103,10 +103,10 @@ implements ResourceScheduler, CapacitySc } }; - static final Comparator applicationComparator = - new Comparator() { + static final Comparator applicationComparator = + new Comparator() { @Override - public int compare(SchedulerApp a1, SchedulerApp a2) { + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { return a1.getApplicationId().getId() - a2.getApplicationId().getId(); } }; @@ -131,8 +131,8 @@ implements ResourceScheduler, CapacitySc private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); + private Map nodes = + new ConcurrentHashMap(); private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); @@ -141,8 +141,8 @@ implements ResourceScheduler, CapacitySc private Resource minimumAllocation; private Resource maximumAllocation; - private Map applications = - new ConcurrentHashMap(); + private Map applications = + new ConcurrentHashMap(); private boolean initialized = false; @@ -299,7 +299,7 @@ implements ResourceScheduler, CapacitySc CSQueue parent, String queueName, Map queues, Map oldQueues, Comparator queueComparator, - Comparator applicationComparator, + Comparator applicationComparator, QueueHook hook) throws IOException { CSQueue queue; String[] childQueueNames = @@ -370,8 +370,8 @@ implements ResourceScheduler, CapacitySc } // TODO: Fix store - SchedulerApp SchedulerApp = - new SchedulerApp(applicationAttemptId, user, queue, + FiCaSchedulerApp SchedulerApp = + new FiCaSchedulerApp(applicationAttemptId, user, queue, queue.getActiveUsersManager(), rmContext, null); // Submit to the queue @@ -404,7 +404,7 @@ implements ResourceScheduler, CapacitySc LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { // throw new IOException("Unknown application " + applicationId + @@ -456,7 +456,7 @@ implements ResourceScheduler, CapacitySc public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release) { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + applicationAttemptId); @@ -551,7 +551,7 @@ implements ResourceScheduler, CapacitySc LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } - SchedulerNode node = getNode(nm.getNodeID()); + FiCaSchedulerNode node = getNode(nm.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -578,7 +578,7 @@ implements ResourceScheduler, CapacitySc RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp reservedApplication = + FiCaSchedulerApp reservedApplication = getApplication(reservedContainer.getApplicationAttemptId()); // Try to fulfill the reservation @@ -601,10 +601,10 @@ implements ResourceScheduler, CapacitySc } - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -672,7 +672,7 @@ implements ResourceScheduler, CapacitySc } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); ++numNodeManagers; @@ -681,7 +681,7 @@ implements ResourceScheduler, CapacitySc } private synchronized void removeNode(RMNode nodeInfo) { - SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -726,7 +726,7 @@ implements ResourceScheduler, CapacitySc // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + " unknown application " + applicationAttemptId + @@ -735,7 +735,7 @@ implements ResourceScheduler, CapacitySc } // Get the node on which the container was allocated - SchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = getNode(container.getNodeId()); // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); @@ -749,24 +749,24 @@ implements ResourceScheduler, CapacitySc } @Lock(Lock.NoLock.class) - SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { + FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { return applications.get(applicationAttemptId); } @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId applicationAttemptId) { - SchedulerApp app = getApplication(applicationAttemptId); + FiCaSchedulerApp app = getApplication(applicationAttemptId); return app == null ? null : new SchedulerAppReport(app); } @Lock(Lock.NoLock.class) - SchedulerNode getNode(NodeId nodeId) { + FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -790,7 +790,7 @@ implements ResourceScheduler, CapacitySc @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = getNode(nodeId); + FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Tue Jul 17 01:43:03 2012 @@ -61,9 +61,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -94,11 +94,11 @@ public class LeafQueue implements CSQueu private float usedCapacity = 0.0f; private volatile int numContainers; - Set activeApplications; - Map applicationsMap = - new HashMap(); + Set activeApplications; + Map applicationsMap = + new HashMap(); - Set pendingApplications; + Set pendingApplications; private final Resource minimumAllocation; private final Resource maximumAllocation; @@ -126,7 +126,7 @@ public class LeafQueue implements CSQueu public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, - Comparator applicationComparator, CSQueue old) { + Comparator applicationComparator, CSQueue old) { this.scheduler = cs; this.queueName = queueName; this.parent = parent; @@ -199,8 +199,8 @@ public class LeafQueue implements CSQueu } this.pendingApplications = - new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); + new TreeSet(applicationComparator); + this.activeApplications = new TreeSet(applicationComparator); } private synchronized void setupQueueConfigs( @@ -580,7 +580,7 @@ public class LeafQueue implements CSQueu } @Override - public void submitApplication(SchedulerApp application, String userName, + public void submitApplication(FiCaSchedulerApp application, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! @@ -644,9 +644,9 @@ public class LeafQueue implements CSQueu } private synchronized void activateApplications() { - for (Iterator i=pendingApplications.iterator(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { - SchedulerApp application = i.next(); + FiCaSchedulerApp application = i.next(); // Check queue limit if (getNumActiveApplications() >= getMaximumActiveApplications()) { @@ -666,7 +666,7 @@ public class LeafQueue implements CSQueu } } - private synchronized void addApplication(SchedulerApp application, User user) { + private synchronized void addApplication(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); pendingApplications.add(application); @@ -686,7 +686,7 @@ public class LeafQueue implements CSQueu } @Override - public void finishApplication(SchedulerApp application, String queue) { + public void finishApplication(FiCaSchedulerApp application, String queue) { // Careful! Locking order is important! synchronized (this) { removeApplication(application, getUser(application.getUser())); @@ -696,7 +696,7 @@ public class LeafQueue implements CSQueu parent.finishApplication(application, queue); } - public synchronized void removeApplication(SchedulerApp application, User user) { + public synchronized void removeApplication(FiCaSchedulerApp application, User user) { boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); @@ -728,7 +728,7 @@ public class LeafQueue implements CSQueu ); } - private synchronized SchedulerApp getApplication( + private synchronized FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applicationsMap.get(applicationAttemptId); } @@ -738,7 +738,7 @@ public class LeafQueue implements CSQueu @Override public synchronized CSAssignment - assignContainers(Resource clusterResource, SchedulerNode node) { + assignContainers(Resource clusterResource, FiCaSchedulerNode node) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getHostName() @@ -748,7 +748,7 @@ public class LeafQueue implements CSQueu // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); return new CSAssignment( assignReservedContainer(application, node, reservedContainer, @@ -758,7 +758,7 @@ public class LeafQueue implements CSQueu } // Try to assign containers to applications in order - for (SchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : activeApplications) { if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -836,8 +836,8 @@ public class LeafQueue implements CSQueu } - private synchronized Resource assignReservedContainer(SchedulerApp application, - SchedulerNode node, RMContainer rmContainer, Resource clusterResource) { + private synchronized Resource assignReservedContainer(FiCaSchedulerApp application, + FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -880,9 +880,9 @@ public class LeafQueue implements CSQueu return true; } - @Lock({LeafQueue.class, SchedulerApp.class}) + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( - SchedulerApp application, Resource clusterResource, Resource required) { + FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); @@ -919,7 +919,7 @@ public class LeafQueue implements CSQueu } @Lock(NoLock.class) - private Resource computeUserLimit(SchedulerApp application, + private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if @@ -1007,7 +1007,7 @@ public class LeafQueue implements CSQueu return (a + (b - 1)) / b; } - boolean needContainers(SchedulerApp application, Priority priority, Resource required) { + boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1036,7 +1036,7 @@ public class LeafQueue implements CSQueu } private CSAssignment assignContainersOnNode(Resource clusterResource, - SchedulerNode node, SchedulerApp application, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { Resource assigned = Resources.none(); @@ -1065,7 +1065,7 @@ public class LeafQueue implements CSQueu } private Resource assignNodeLocalContainers(Resource clusterResource, - SchedulerNode node, SchedulerApp application, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node.getHostName()); @@ -1081,7 +1081,7 @@ public class LeafQueue implements CSQueu } private Resource assignRackLocalContainers(Resource clusterResource, - SchedulerNode node, SchedulerApp application, Priority priority, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node.getRackName()); @@ -1095,8 +1095,8 @@ public class LeafQueue implements CSQueu return Resources.none(); } - private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, - SchedulerApp application, Priority priority, + private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, RMNode.ANY); @@ -1111,8 +1111,8 @@ public class LeafQueue implements CSQueu return Resources.none(); } - boolean canAssign(SchedulerApp application, Priority priority, - SchedulerNode node, NodeType type, RMContainer reservedContainer) { + boolean canAssign(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { // Clearly we need containers for this application... if (type == NodeType.OFF_SWITCH) { @@ -1159,14 +1159,14 @@ public class LeafQueue implements CSQueu } private Container getContainer(RMContainer rmContainer, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { return (rmContainer != null) ? rmContainer.getContainer() : createContainer(application, node, capability, priority); } - public Container createContainer(SchedulerApp application, SchedulerNode node, + public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); @@ -1192,8 +1192,8 @@ public class LeafQueue implements CSQueu return container; } - private Resource assignContainer(Resource clusterResource, SchedulerNode node, - SchedulerApp application, Priority priority, + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getHostName() @@ -1267,8 +1267,8 @@ public class LeafQueue implements CSQueu } } - private void reserve(SchedulerApp application, Priority priority, - SchedulerNode node, RMContainer rmContainer, Container container) { + private void reserve(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer, Container container) { // Update reserved metrics if this is the first reservation if (rmContainer == null) { getMetrics().reserveResource( @@ -1282,8 +1282,8 @@ public class LeafQueue implements CSQueu node.reserveResource(application, priority, rmContainer); } - private void unreserve(SchedulerApp application, Priority priority, - SchedulerNode node, RMContainer rmContainer) { + private void unreserve(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? application.unreserve(node, priority); node.unreserveResource(application); @@ -1296,7 +1296,7 @@ public class LeafQueue implements CSQueu @Override public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, RMContainer rmContainer, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! @@ -1338,7 +1338,7 @@ public class LeafQueue implements CSQueu } synchronized void allocateResource(Resource clusterResource, - SchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1363,7 +1363,7 @@ public class LeafQueue implements CSQueu } synchronized void releaseResource(Resource clusterResource, - SchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource) { // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1401,7 +1401,7 @@ public class LeafQueue implements CSQueu this, parent, clusterResource, minimumAllocation); // Update application properties - for (SchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none()); @@ -1464,7 +1464,7 @@ public class LeafQueue implements CSQueu @Override public void recoverContainer(Resource clusterResource, - SchedulerApp application, Container container) { + FiCaSchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, application, container.getResource()); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Tue Jul 17 01:43:03 2012 @@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @Private @Evolving @@ -421,7 +421,7 @@ public class ParentQueue implements CSQu } @Override - public void submitApplication(SchedulerApp application, String user, + public void submitApplication(FiCaSchedulerApp application, String user, String queue) throws AccessControlException { synchronized (this) { @@ -453,7 +453,7 @@ public class ParentQueue implements CSQu } } - private synchronized void addApplication(SchedulerApp application, + private synchronized void addApplication(FiCaSchedulerApp application, String user) { ++numApplications; @@ -466,7 +466,7 @@ public class ParentQueue implements CSQu } @Override - public void finishApplication(SchedulerApp application, String queue) { + public void finishApplication(FiCaSchedulerApp application, String queue) { synchronized (this) { removeApplication(application, application.getUser()); @@ -478,7 +478,7 @@ public class ParentQueue implements CSQu } } - public synchronized void removeApplication(SchedulerApp application, + public synchronized void removeApplication(FiCaSchedulerApp application, String user) { --numApplications; @@ -516,7 +516,7 @@ public class ParentQueue implements CSQu @Override public synchronized CSAssignment assignContainers( - Resource clusterResource, SchedulerNode node) { + Resource clusterResource, FiCaSchedulerNode node) { CSAssignment assignment = new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); @@ -594,14 +594,14 @@ public class ParentQueue implements CSQu } - private boolean canAssign(SchedulerNode node) { + private boolean canAssign(FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(node.getAvailableResource(), minimumAllocation); } synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - SchedulerNode node) { + FiCaSchedulerNode node) { CSAssignment assignment = new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); @@ -654,7 +654,7 @@ public class ParentQueue implements CSQu @Override public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! @@ -715,7 +715,7 @@ public class ParentQueue implements CSQu @Override public void recoverContainer(Resource clusterResource, - SchedulerApp application, Container container) { + FiCaSchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, container.getResource()); Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1362332&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Jul 17 01:43:03 2012 @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +/** + * Represents an Application from the viewpoint of the scheduler. + * Each running Application in the RM corresponds to one instance + * of this class. + */ +@SuppressWarnings("unchecked") +@Private +@Unstable +public class FiCaSchedulerApp extends SchedulerApplication { + + private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private final AppSchedulingInfo appSchedulingInfo; + private final Queue queue; + + private final Resource currentConsumption = recordFactory + .newRecordInstance(Resource.class); + private Resource resourceLimit = recordFactory + .newRecordInstance(Resource.class); + + private Map liveContainers + = new HashMap(); + private List newlyAllocatedContainers = + new ArrayList(); + + final Map> reservedContainers = + new HashMap>(); + + /** + * Count how many times the application has been given an opportunity + * to schedule a task at each priority. Each time the scheduler + * asks the application for a task at this priority, it is incremented, + * and each time the application successfully schedules a task, it + * is reset to 0. + */ + Multiset schedulingOpportunities = HashMultiset.create(); + + Multiset reReservations = HashMultiset.create(); + + Resource currentReservation = recordFactory + .newRecordInstance(Resource.class); + + private final RMContext rmContext; + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, ApplicationStore store) { + this.rmContext = rmContext; + this.appSchedulingInfo = + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager, store); + this.queue = queue; + } + + public ApplicationId getApplicationId() { + return this.appSchedulingInfo.getApplicationId(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return this.appSchedulingInfo.getApplicationAttemptId(); + } + + public String getUser() { + return this.appSchedulingInfo.getUser(); + } + + public synchronized void updateResourceRequests( + List requests) { + this.appSchedulingInfo.updateResourceRequests(requests); + } + + public Map getResourceRequests(Priority priority) { + return this.appSchedulingInfo.getResourceRequests(priority); + } + + public int getNewContainerId() { + return this.appSchedulingInfo.getNewContainerId(); + } + + public Collection getPriorities() { + return this.appSchedulingInfo.getPriorities(); + } + + public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { + return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress); + } + + public synchronized int getTotalRequiredResources(Priority priority) { + return getResourceRequest(priority, RMNode.ANY).getNumContainers(); + } + + public Resource getResource(Priority priority) { + return this.appSchedulingInfo.getResource(priority); + } + + /** + * Is this application pending? + * @return true if it is else false. + */ + @Override + public boolean isPending() { + return this.appSchedulingInfo.isPending(); + } + + public String getQueueName() { + return this.appSchedulingInfo.getQueueName(); + } + + /** + * Get the list of live containers + * @return All of the live containers + */ + @Override + public synchronized Collection getLiveContainers() { + return new ArrayList(liveContainers.values()); + } + + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { + // Cleanup all scheduling information + this.appSchedulingInfo.stop(rmAppAttemptFinalState); + } + + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { + // Inform the container + RMContainer rmContainer = + getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + } + + synchronized public void containerCompleted(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); + LOG.info("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); + + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + getApplicationId(), containerId); + + // Update usage metrics + Resource containerResource = rmContainer.getContainer().getResource(); + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + Resources.subtractFrom(currentConsumption, containerResource); + } + + synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + Priority priority, ResourceRequest request, + Container container) { + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(priority) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, this + .getApplicationAttemptId(), node.getNodeID(), this.rmContext + .getDispatcher().getEventHandler(), this.rmContext + .getContainerAllocationExpirer()); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + + // Update consumption and track allocations + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), container.getId()); + + return rmContainer; + } + + synchronized public List pullNewlyAllocatedContainers() { + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (RMContainer rmContainer : newlyAllocatedContainers) { + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + returnContainerList.add(rmContainer.getContainer()); + } + newlyAllocatedContainers.clear(); + return returnContainerList; + } + + public Resource getCurrentConsumption() { + return this.currentConsumption; + } + + synchronized public void showRequests() { + if (LOG.isDebugEnabled()) { + for (Priority priority : getPriorities()) { + Map requests = getResourceRequests(priority); + if (requests != null) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + + " currentConsumption=" + currentConsumption.getMemory()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } + } + } + } + } + + public synchronized RMContainer getRMContainer(ContainerId id) { + return liveContainers.get(id); + } + + synchronized public void resetSchedulingOpportunities(Priority priority) { + this.schedulingOpportunities.setCount(priority, 0); + } + + synchronized public void addSchedulingOpportunity(Priority priority) { + this.schedulingOpportunities.setCount(priority, + schedulingOpportunities.count(priority) + 1); + } + + /** + * Return the number of times the application has been given an opportunity + * to schedule a task at the given priority since the last time it + * successfully did so. + */ + synchronized public int getSchedulingOpportunities(Priority priority) { + return this.schedulingOpportunities.count(priority); + } + + synchronized void resetReReservations(Priority priority) { + this.reReservations.setCount(priority, 0); + } + + synchronized void addReReservation(Priority priority) { + this.reReservations.add(priority); + } + + synchronized public int getReReservations(Priority priority) { + return this.reReservations.count(priority); + } + + public synchronized int getNumReservedContainers(Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } + + /** + * Get total current reservations. + * Used only by unit tests + * @return total current reservations + */ + @Stable + @Private + public synchronized Resource getCurrentReservation() { + return currentReservation; + } + + public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority, + RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), rmContext.getDispatcher().getEventHandler(), + rmContext.getContainerAllocationExpirer()); + + Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + rmContainer.handle(new RMContainerReservedEvent(container.getId(), + container.getResource(), node.getNodeID(), priority)); + + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers == null) { + reservedContainers = new HashMap(); + this.reservedContainers.put(priority, reservedContainers); + } + reservedContainers.put(node.getNodeID(), rmContainer); + + LOG.info("Application " + getApplicationId() + + " reserved container " + rmContainer + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + + "; currentReservation " + currentReservation.getMemory()); + + return rmContainer; + } + + public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + " at priority " + + priority + "; currentReservation " + currentReservation); + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(this.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + /** + * Get the list of reserved containers + * @return All of the reserved containers. + */ + @Override + public synchronized List getReservedContainers() { + List reservedContainers = new ArrayList(); + for (Map.Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } + + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + + /** + * Get available headroom in terms of resources for the application's user. + * @return available resource headroom + */ + public synchronized Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); + } + + return resourceLimit; + } + + public Queue getQueue() { + return queue; + } +}