Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5E30E11C05 for ; Tue, 19 Aug 2014 23:53:41 +0000 (UTC) Received: (qmail 59356 invoked by uid 500); 19 Aug 2014 23:53:41 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 59277 invoked by uid 500); 19 Aug 2014 23:53:41 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 59256 invoked by uid 99); 19 Aug 2014 23:53:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:53:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:53:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 69C512388CF6; Tue, 19 Aug 2014 23:51:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1619012 [14/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api... Date: Tue, 19 Aug 2014 23:51:01 -0000 To: yarn-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140819235119.69C512388CF6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 23:49:39 2014 @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,10 +39,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -49,19 +50,13 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -69,16 +64,15 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -96,17 +90,20 @@ import org.apache.hadoop.yarn.util.resou import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class CapacityScheduler extends AbstractYarnScheduler - implements PreemptableResourceScheduler, CapacitySchedulerContext, - Configurable { +public class CapacityScheduler extends + AbstractYarnScheduler implements + PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private CSQueue root; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; static final Comparator queueComparator = new Comparator() { @Override @@ -182,17 +179,7 @@ public class CapacityScheduler extends A private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); - - private Resource clusterResource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - private int numNodeManagers = 0; - - private Resource minimumAllocation; - private Resource maximumAllocation; - - private boolean initialized = false; + private AtomicInteger numNodeManagers = new AtomicInteger(0); private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -209,7 +196,19 @@ public class CapacityScheduler extends A + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - public CapacityScheduler() {} + private boolean overrideWithQueueMappings = false; + private List mappings = null; + private Groups groups; + + @VisibleForTesting + public synchronized String getMappedQueueForTest(String user) + throws IOException { + return getMappedQueue(user); + } + + public CapacityScheduler() { + super(CapacityScheduler.class.getName()); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -231,16 +230,6 @@ public class CapacityScheduler extends A } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } - - @Override public Comparator getApplicationComparator() { return applicationComparator; } @@ -256,66 +245,95 @@ public class CapacityScheduler extends A } @Override - public synchronized int getNumClusterNodes() { - return numNodeManagers; + public int getNumClusterNodes() { + return numNodeManagers.get(); } @Override - public RMContext getRMContext() { + public synchronized RMContext getRMContext() { return this.rmContext; } @Override - public Resource getClusterResources() { - return clusterResource; + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } - + + private synchronized void initScheduler(Configuration configuration) throws + IOException { + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + this.maximumAllocation = this.conf.getMaximumAllocation(); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = + new ConcurrentHashMap>(); + initializeQueues(this.conf); + + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = + this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this); + } + + LOG.info("Initialized CapacityScheduler with " + + "calculator=" + getResourceCalculator().getClass() + ", " + + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + + "asynchronousScheduling=" + scheduleAsynchronously + ", " + + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + } + + private synchronized void startSchedulerThreads() { + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThread, + "asyncSchedulerThread is null"); + asyncSchedulerThread.start(); + } + } + @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { + public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); - if (!initialized) { - this.rmContext = rmContext; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap(); + initScheduler(configuration); + super.serviceInit(conf); + } - initializeQueues(this.conf); - - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = - this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - if (scheduleAsynchronously) { - asyncSchedulerThread = new AsyncScheduleThread(this); - asyncSchedulerThread.start(); - } - - initialized = true; - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + - "asynchronousScheduling=" + scheduleAsynchronously + ", " + - "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); - - } else { - CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - try { - LOG.info("Re-initializing queues..."); - reinitializeQueues(this.conf); - } catch (Throwable t) { - this.conf = oldConf; - throw new IOException("Failed to re-init queues", t); + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + synchronized (this) { + if (scheduleAsynchronously && asyncSchedulerThread != null) { + asyncSchedulerThread.interrupt(); + asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); } } + super.serviceStop(); + } + + @Override + public synchronized void + reinitialize(Configuration conf, RMContext rmContext) throws IOException { + Configuration configuration = new Configuration(conf); + CapacitySchedulerConfiguration oldConf = this.conf; + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + try { + LOG.info("Re-initializing queues..."); + reinitializeQueues(this.conf); + } catch (Throwable t) { + this.conf = oldConf; + throw new IOException("Failed to re-init queues", t); + } } long getAsyncScheduleInterval() { @@ -390,7 +408,32 @@ public class CapacityScheduler extends A } } private static final QueueHook noop = new QueueHook(); - + + private void initializeQueueMappings() throws IOException { + overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info("Initialized queue mappings, override: " + + overrideWithQueueMappings); + // Get new user/group mappings + List newMappings = conf.getQueueMappings(); + //check if mappings refer to valid queues + for (QueueMapping mapping : newMappings) { + if (!mapping.queue.equals(CURRENT_USER_MAPPING) && + !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + CSQueue queue = queues.get(mapping.queue); + if (queue == null || !(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.queue); + } + } + } + //apply the new mappings since they are valid + mappings = newMappings; + // initialize groups if mappings are present + if (mappings.size() > 0) { + groups = new Groups(conf); + } + } + @Lock(CapacityScheduler.class) private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { @@ -398,7 +441,9 @@ public class CapacityScheduler extends A root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); + LOG.info("Initialized root queue " + root); + initializeQueueMappings(); } @Lock(CapacityScheduler.class) @@ -418,6 +463,7 @@ public class CapacityScheduler extends A // Re-configure queues root.reinitialize(newRoot, clusterResource); + initializeQueueMappings(); } /** @@ -505,12 +551,73 @@ public class CapacityScheduler extends A } synchronized CSQueue getQueue(String queueName) { + if (queueName == null) { + return null; + } return queues.get(queueName); } + private static final String CURRENT_USER_MAPPING = "%user"; + + private static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + private String getMappedQueue(String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.type == MappingType.USER) { + if (mapping.source.equals(CURRENT_USER_MAPPING)) { + if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + return user; + } + else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + return groups.getGroups(user).get(0); + } + else { + return mapping.queue; + } + } + if (user.equals(mapping.source)) { + return mapping.queue; + } + } + if (mapping.type == MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.source)) { + return mapping.queue; + } + } + } + } + return null; + } + private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { - // santiy checks. + String queueName, String user, boolean isAppRecovering) { + + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info("Application " + applicationId + " user " + user + + " mapping [" + queueName + "] to [" + mappedQueue + + "] override " + overrideWithQueueMappings); + queueName = mappedQueue; + RMApp rmApp = rmContext.getRMApps().get(applicationId); + rmApp.setQueue(queueName); + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + } + + // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { String message = "Application " + applicationId + @@ -536,19 +643,28 @@ public class CapacityScheduler extends A .handle(new RMAppRejectedEvent(applicationId, ace.toString())); return; } - SchedulerApplication application = - new SchedulerApplication(queue, user); + // update the metrics + queue.getMetrics().submitApp(user); + SchedulerApplication application = + new SchedulerApplication(queue, user); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + boolean transferStateFromPreviousAttempt, + boolean isAttemptRecovering) { + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -565,14 +681,22 @@ public class CapacityScheduler extends A LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( + if (isAttemptRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); + } + } else { + rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + RMAppAttemptEventType.ATTEMPT_ADDED)); + } } private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication application = + applications.get(applicationId); if (application == null){ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, // ignore it. @@ -597,7 +721,7 @@ public class CapacityScheduler extends A " finalState=" + rmAppAttemptFinalState); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { @@ -659,25 +783,11 @@ public class CapacityScheduler extends A // Sanity check SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResources(), + ask, getResourceCalculator(), getClusterResource(), getMinimumResourceCapability(), maximumAllocation); // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -757,7 +867,10 @@ public class CapacityScheduler extends A FiCaSchedulerNode node = getNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); + if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, + LOG)) { + root.updateClusterResource(clusterResource); + } List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); @@ -822,7 +935,7 @@ public class CapacityScheduler extends A // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + if (Resources.greaterThanOrEqual(calculator, getClusterResource(), node.getAvailableResource(), minimumAllocation)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + @@ -839,21 +952,6 @@ public class CapacityScheduler extends A } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Override public void handle(SchedulerEvent event) { switch(event.getType()) { @@ -861,6 +959,8 @@ public class CapacityScheduler extends A { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: @@ -883,7 +983,8 @@ public class CapacityScheduler extends A { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), + appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -898,7 +999,8 @@ public class CapacityScheduler extends A AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: @@ -932,25 +1034,25 @@ public class CapacityScheduler extends A usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); - ++numNodeManagers; + int numNodes = numNodeManagers.incrementAndGet(); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); - if (scheduleAsynchronously && numNodeManagers == 1) { + if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } } private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); root.updateClusterResource(clusterResource); - --numNodeManagers; + int numNodes = numNodeManagers.decrementAndGet(); - if (scheduleAsynchronously && numNodeManagers == 0) { + if (scheduleAsynchronously && numNodes == 0) { asyncSchedulerThread.suspendSchedule(); } @@ -980,7 +1082,8 @@ public class CapacityScheduler extends A } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -1015,28 +1118,10 @@ public class CapacityScheduler extends A @Lock(Lock.NoLock.class) @VisibleForTesting - public FiCaSchedulerApp getApplicationAttempt( - ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); - } - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( + public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); + return super.getApplicationAttempt(applicationAttemptId); } @Lock(Lock.NoLock.class) @@ -1048,24 +1133,6 @@ public class CapacityScheduler extends A Map getAllNodes() { return nodes; } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - @VisibleForTesting - public FiCaSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } @Override @Lock(Lock.NoLock.class) @@ -1074,12 +1141,6 @@ public class CapacityScheduler extends A } @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public void dropContainerReservation(RMContainer container) { if(LOG.isDebugEnabled()){ LOG.debug("DROP_RESERVATION:" + container.toString()); @@ -1105,14 +1166,13 @@ public class CapacityScheduler extends A @Override public void killContainer(RMContainer cont) { - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - completedContainer(cont, - SchedulerUtils.createPreemptedContainerStatus( - cont.getContainerId(),"Container being forcibly preempted:" - + cont.getContainerId()), - RMContainerEventType.KILL); + recoverResourceRequestForContainer(cont); + completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override @@ -1156,4 +1216,59 @@ public class CapacityScheduler extends A throw new IOException(e); } } + + @Override + public synchronized String moveApplication(ApplicationId appId, + String targetQueueName) throws YarnException { + FiCaSchedulerApp app = + getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + LeafQueue source = getAndCheckLeafQueue(sourceQueueName); + LeafQueue dest = getAndCheckLeafQueue(targetQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + try { + dest.submitApplication(appId, user, targetQueueName); + } catch (AccessControlException e) { + throw new YarnException(e); + } + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(clusterResource, app, rmContainer); + // attach the Container to another queue + dest.attachContainer(clusterResource, app, rmContainer); + } + // Detach the application.. + source.finishApplicationAttempt(app, sourceQueueName); + source.getParent().finishApplication(appId, app.getUser()); + // Finish app & update metrics + app.move(dest); + // Submit to a new queue + dest.submitApplicationAttempt(app, user); + applications.get(appId).setQueue(dest); + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueueName + " to: " + targetQueueName); + return targetQueueName; + } + + /** + * Check that the String provided in input is the name of an existing, + * LeafQueue, if successful returns the queue. + * + * @param queue + * @return the LeafQueue + * @throws YarnException + */ + private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { + CSQueue ret = this.getQueue(queue); + if (ret == null) { + throw new YarnException("The specified Queue: " + queue + + " doesn't exist"); + } + if (!(ret instanceof LeafQueue)) { + throw new YarnException("The specified Queue: " + queue + + " is not a Leaf Queue. Move is supported only for Leaf Queues."); + } + return (LeafQueue) ret; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Tue Aug 19 23:49:39 2014 @@ -18,8 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + + @Private + public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; + + @Private + public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + + @Private + public static class QueueMapping { + + public enum MappingType { + + USER("u"), + GROUP("g"); + private final String type; + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } + } public CapacitySchedulerConfiguration() { this(new Configuration()); @@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); } + public boolean getOverrideWithQueueMappings() { + return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); + } + + /** + * Returns a collection of strings, trimming leading and trailing whitespeace + * on each value + * + * @param str + * String to parse + * @param delim + * delimiter to separate the values + * @return Collection of parsed elements. + */ + private static Collection getTrimmedStringCollection(String str, + String delim) { + List values = new ArrayList(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public List getQueueMappings() { + List mappings = + new ArrayList(); + Collection mappingsString = + getTrimmedStringCollection(QUEUE_MAPPING); + for (String mappingValue : mappingsString) { + String[] mapping = + getTrimmedStringCollection(mappingValue, ":") + .toArray(new String[] {}); + if (mapping.length != 3 || mapping[1].length() == 0 + || mapping[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + QueueMapping m; + try { + QueueMapping.MappingType mappingType; + if (mapping[0].equals("u")) { + mappingType = QueueMapping.MappingType.USER; + } else if (mapping[0].equals("g")) { + mappingType = QueueMapping.MappingType.GROUP; + } else { + throw new IllegalArgumentException( + "unknown mapping prefix " + mapping[0]); + } + m = new QueueMapping( + mappingType, + mapping[1], + mapping[2]); + } catch (Throwable t) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + if (m != null) { + mappings.add(m); + } + } + + return mappings; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Tue Aug 19 23:49:39 2014 @@ -43,7 +43,7 @@ public interface CapacitySchedulerContex RMContext getRMContext(); - Resource getClusterResources(); + Resource getClusterResource(); /** * Get the yarn configuration. Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Tue Aug 19 23:49:39 2014 @@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public class LeafQueue implements CSQueue { @@ -174,12 +176,12 @@ public class LeafQueue implements CSQueu int maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteMaxCapacity); this.maxActiveAppsUsingAbsCap = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteCapacity); int maxActiveApplicationsPerUser = CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, @@ -195,7 +197,7 @@ public class LeafQueue implements CSQueu cs.getConfiguration().getAcls(getQueuePath()); setupQueueConfigs( - cs.getClusterResources(), + cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, @@ -564,7 +566,8 @@ public class LeafQueue implements CSQueu "numContainers=" + getNumContainers(); } - private synchronized User getUser(String userName) { + @VisibleForTesting + public synchronized User getUser(String userName) { User user = users.get(userName); if (user == null) { user = new User(); @@ -640,7 +643,10 @@ public class LeafQueue implements CSQueu addApplicationAttempt(application, user); } - metrics.submitAppAttempt(userName); + // We don't want to update metrics for move app + if (application.isPending()) { + metrics.submitAppAttempt(userName); + } getParent().submitApplicationAttempt(application, userName); } @@ -698,7 +704,6 @@ public class LeafQueue implements CSQueu throw ace; } - metrics.submitApp(userName); } private synchronized void activateApplications() { @@ -973,13 +978,18 @@ public class LeafQueue implements CSQueu Resource userLimit = // User limit computeUserLimit(application, clusterResource, required); - + + //Max avail capacity needs to take into account usage by ancestor-siblings + //which are greater than their base capacity, so we are interested in "max avail" + //capacity + float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, this); Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( resourceCalculator, clusterResource, - absoluteMaxCapacity, + absoluteMaxAvailCapacity, minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); @@ -1346,8 +1356,7 @@ public class LeafQueue implements CSQueu } // Inform the node - node.allocateContainer(application.getApplicationId(), - allocatedContainer); + node.allocateContainer(allocatedContainer); LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + @@ -1446,7 +1455,7 @@ public class LeafQueue implements CSQueu } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + SchedulerApplicationAttempt application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1530,7 +1539,8 @@ public class LeafQueue implements CSQueu return metrics; } - static class User { + @VisibleForTesting + public static class User { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; @@ -1580,13 +1590,16 @@ public class LeafQueue implements CSQueu @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, attempt, rmContainer.getContainer() + .getResource()); } - getParent().recoverContainer(clusterResource, application, container); - + getParent().recoverContainer(clusterResource, attempt, rmContainer); } /** @@ -1609,9 +1622,43 @@ public class LeafQueue implements CSQueu @Override public void collectSchedulerApplications( Collection apps) { + for (FiCaSchedulerApp pendingApp : pendingApplications) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp app : activeApplications) { apps.add(app.getApplicationAttemptId()); } } + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().attachContainer(clusterResource, application, rmContainer); + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().detachContainer(clusterResource, application, rmContainer); + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Tue Aug 19 23:49:39 2014 @@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -143,7 +144,7 @@ public class ParentQueue implements CSQu this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); - setupQueueConfigs(cs.getClusterResources(), + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, state, acls); @@ -770,13 +771,16 @@ public class ParentQueue implements CSQu @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, container.getResource()); + allocateResource(clusterResource,rmContainer.getContainer().getResource()); } if (parent != null) { - parent.recoverContainer(clusterResource, application, container); + parent.recoverContainer(clusterResource, attempt, rmContainer); } } @@ -787,4 +791,37 @@ public class ParentQueue implements CSQu queue.collectSchedulerApplications(apps); } } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.attachContainer(clusterResource, application, rmContainer); + } + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, rmContainer.getContainer().getResource()); + LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.detachContainer(clusterResource, application, rmContainer); + } + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Aug 19 23:49:39 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); // Inform the container rmContainer.handle( Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Tue Aug 19 23:49:39 2014 @@ -18,248 +18,85 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - private final String nodeName; - public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); - this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return this.rmNode; - } - - public NodeId getNodeID() { - return this.rmNode.getNodeID(); - } - - public String getHttpAddress() { - return this.rmNode.getHttpAddress(); - } - - @Override - public String getNodeName() { - return nodeName; - } - - @Override - public String getRackName() { - return this.rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return this.availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return this.usedResource; - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) - return true; - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - if (null != launchedContainers.remove(container.getId())) { - updateResource(container); - } - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); + super(node, usePortForNodeName); } @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - public synchronized void reserveResource( - SchedulerApplicationAttempt application, Priority priority, - RMContainer reservedContainer) { + SchedulerApplicationAttempt application, Priority priority, + RMContainer container) { // Check if it's already reserved - if (this.reservedContainer != null) { + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + if (!container.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); } // Cannot reserve more than one application attempt on a given node! // Reservation is still against attempt. - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals(container.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + container + " for application " + application.getApplicationAttemptId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + reservedContainer + " on node " + this); } if (LOG.isDebugEnabled()) { LOG.debug("Updated reserved container " - + reservedContainer.getContainer().getId() + " on node " + this + + container.getContainer().getId() + " on node " + this + " for application attempt " + application.getApplicationAttemptId()); } } else { if (LOG.isDebugEnabled()) { LOG.debug("Reserved container " - + reservedContainer.getContainer().getId() + " on node " + this + + container.getContainer().getId() + " on node " + this + " for application attempt " + application.getApplicationAttemptId()); } } - this.reservedContainer = reservedContainer; + setReservedContainer(container); } + @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { - + // adding NP checks as this can now be called for preemption - if (reservedContainer != null - && reservedContainer.getContainer() != null - && reservedContainer.getContainer().getId() != null - && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) { + if (getReservedContainer() != null + && getReservedContainer().getContainer() != null + && getReservedContainer().getContainer().getId() != null + && getReservedContainer().getContainer().getId() + .getApplicationAttemptId() != null) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); if (!reservedApplication.equals( application.getApplicationAttemptId())) { throw new IllegalStateException("Trying to unreserve " + @@ -269,17 +106,6 @@ public class FiCaSchedulerNode extends S " on node " + this); } } - reservedContainer = null; - } - - public synchronized RMContainer getReservedContainer() { - return reservedContainer; + setReservedContainer(null); } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014 @@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte private final ApplicationId applicationId; private final String queue; private final String user; + private final boolean isAppRecovering; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, false); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, boolean isAppRecovering) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.isAppRecovering = isAppRecovering; } public ApplicationId getApplicationId() { @@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte return user; } + public boolean getIsAppRecovering() { + return isAppRecovering; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014 @@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEve private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean isAttemptRecovering; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, false); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean isAttemptRecovering) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.isAttemptRecovering = isAttemptRecovering; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEve public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getIsAttemptRecovering() { + return isAttemptRecovering; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014 @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerReports; public NodeAddedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerReports = null; + } + + public NodeAddedSchedulerEvent(RMNode rmNode, + List containerReports) { + super(SchedulerEventType.NODE_ADDED); + this.rmNode = rmNode; + this.containerReports = containerReports; } public RMNode getAddedRMNode() { return rmNode; } + public List getContainerReports() { + return containerReports; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Tue Aug 19 23:49:39 2014 @@ -53,6 +53,10 @@ public class AllocationConfiguration { private final int userMaxAppsDefault; private final int queueMaxAppsDefault; + // Maximum resource share for each leaf queue that can be used to run AMs + final Map queueMaxAMShares; + private final float queueMaxAMShareDefault; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -77,26 +81,32 @@ public class AllocationConfiguration { @VisibleForTesting QueuePlacementPolicy placementPolicy; + //Configured queues in the alloc xml @VisibleForTesting - Set queueNames; + Map> configuredQueues; - public AllocationConfiguration(Map minQueueResources, - Map maxQueueResources, + public AllocationConfiguration(Map minQueueResources, + Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, Map schedulingPolicies, + Map queueWeights, + Map queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, float queueMaxAMShareDefault, + Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, - Map minSharePreemptionTimeouts, + Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, - QueuePlacementPolicy placementPolicy, Set queueNames) { + QueuePlacementPolicy placementPolicy, + Map> configuredQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; + this.queueMaxAMShares = queueMaxAMShares; this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; @@ -104,7 +114,7 @@ public class AllocationConfiguration { this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.placementPolicy = placementPolicy; - this.queueNames = queueNames; + this.configuredQueues = configuredQueues; } public AllocationConfiguration(Configuration conf) { @@ -113,17 +123,22 @@ public class AllocationConfiguration { queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); + queueMaxAMShares = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAMShareDefault = -1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; + configuredQueues = new HashMap>(); + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet()); + } placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - new HashSet()); - queueNames = new HashSet(); + configuredQueues); } /** @@ -178,6 +193,11 @@ public class AllocationConfiguration { return (maxApps == null) ? queueMaxAppsDefault : maxApps; } + public float getQueueMaxAMShare(String queue) { + Float maxAMShare = queueMaxAMShares.get(queue); + return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; + } + /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. @@ -221,8 +241,8 @@ public class AllocationConfiguration { return defaultSchedulingPolicy; } - public Set getQueueNames() { - return queueNames; + public Map> getConfiguredQueues() { + return configuredQueues; } public QueuePlacementPolicy getPlacementPolicy() {