Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC9CE200B96 for ; Thu, 6 Oct 2016 17:01:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AB10B160AC6; Thu, 6 Oct 2016 15:01:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 052E8160AAD for ; Thu, 6 Oct 2016 17:01:08 +0200 (CEST) Received: (qmail 51520 invoked by uid 500); 6 Oct 2016 15:01:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 51491 invoked by uid 99); 6 Oct 2016 15:01:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 15:01:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24C35E055E; Thu, 6 Oct 2016 15:01:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Thu, 06 Oct 2016 15:01:06 -0000 Message-Id: In-Reply-To: <015887a63a2747b49cb0f995eb223924@git.apache.org> References: <015887a63a2747b49cb0f995eb223924@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hadoop git commit: YARN-3139. Improve locks in AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan archived-at: Thu, 06 Oct 2016 15:01:11 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5696c71..10df751 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -39,7 +39,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -268,8 +267,7 @@ public class CapacityScheduler extends } @Override - public synchronized RMContainerTokenSecretManager - getContainerTokenSecretManager() { + public RMContainerTokenSecretManager getContainerTokenSecretManager() { return this.rmContext.getContainerTokenSecretManager(); } @@ -294,52 +292,62 @@ public class CapacityScheduler extends } @Override - public synchronized RMContext getRMContext() { + public RMContext getRMContext() { return this.rmContext; } @Override - public synchronized void setRMContext(RMContext rmContext) { + public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } - private synchronized void initScheduler(Configuration configuration) throws + private void initScheduler(Configuration configuration) throws IOException { - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - initMaximumResourceCapability(this.conf.getMaximumAllocation()); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = new ConcurrentHashMap<>(); - this.labelManager = rmContext.getNodeLabelManager(); - authorizer = YarnAuthorizationProvider.getInstance(yarnConf); - this.activitiesManager = new ActivitiesManager(rmContext); - activitiesManager.init(conf); - initializeQueues(this.conf); - this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); - - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = - this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - if (scheduleAsynchronously) { - asyncSchedulerThread = new AsyncScheduleThread(this); - } - - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + - "asynchronousScheduling=" + scheduleAsynchronously + ", " + - "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); - } - - private synchronized void startSchedulerThreads() { - if (scheduleAsynchronously) { - Preconditions.checkNotNull(asyncSchedulerThread, - "asyncSchedulerThread is null"); - asyncSchedulerThread.start(); + try { + writeLock.lock(); + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + initMaximumResourceCapability(this.conf.getMaximumAllocation()); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = new ConcurrentHashMap<>(); + this.labelManager = rmContext.getNodeLabelManager(); + authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.activitiesManager = new ActivitiesManager(rmContext); + activitiesManager.init(conf); + initializeQueues(this.conf); + this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); + + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this); + } + + LOG.info("Initialized CapacityScheduler with " + "calculator=" + + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" + + getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + + scheduleAsynchronously + ", " + "asyncScheduleInterval=" + + asyncScheduleInterval + "ms"); + } finally { + writeLock.unlock(); + } + } + + private void startSchedulerThreads() { + try { + writeLock.lock(); + activitiesManager.start(); + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThread, + "asyncSchedulerThread is null"); + asyncSchedulerThread.start(); + } + } finally { + writeLock.unlock(); } } @@ -353,40 +361,48 @@ public class CapacityScheduler extends @Override public void serviceStart() throws Exception { startSchedulerThreads(); - activitiesManager.start(); super.serviceStart(); } @Override public void serviceStop() throws Exception { - synchronized (this) { + try { + writeLock.lock(); if (scheduleAsynchronously && asyncSchedulerThread != null) { asyncSchedulerThread.interrupt(); asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); } + } finally { + writeLock.unlock(); } + super.serviceStop(); } @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { - Configuration configuration = new Configuration(conf); - CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); + public void reinitialize(Configuration newConf, RMContext rmContext) + throws IOException { try { - LOG.info("Re-initializing queues..."); - refreshMaximumAllocation(this.conf.getMaximumAllocation()); - reinitializeQueues(this.conf); - } catch (Throwable t) { - this.conf = oldConf; - refreshMaximumAllocation(this.conf.getMaximumAllocation()); - throw new IOException("Failed to re-init queues", t); - } + writeLock.lock(); + Configuration configuration = new Configuration(newConf); + CapacitySchedulerConfiguration oldConf = this.conf; + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + try { + LOG.info("Re-initializing queues..."); + refreshMaximumAllocation(this.conf.getMaximumAllocation()); + reinitializeQueues(this.conf); + } catch (Throwable t) { + this.conf = oldConf; + refreshMaximumAllocation(this.conf.getMaximumAllocation()); + throw new IOException("Failed to re-init queues", t); + } - // update lazy preemption - this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); + // update lazy preemption + this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); + } finally { + writeLock.unlock(); + } } long getAsyncScheduleInterval() { @@ -450,10 +466,6 @@ public class CapacityScheduler extends } } - - @Private - public static final String ROOT_QUEUE = - CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; static class QueueHook { public CSQueue hook(CSQueue queue) { @@ -463,38 +475,41 @@ public class CapacityScheduler extends private static final QueueHook noop = new QueueHook(); @VisibleForTesting - public synchronized UserGroupMappingPlacementRule + public UserGroupMappingPlacementRule getUserGroupMappingPlacementRule() throws IOException { - boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); - LOG.info("Initialized queue mappings, override: " - + overrideWithQueueMappings); - - // Get new user/group mappings - List newMappings = - conf.getQueueMappings(); - // check if mappings refer to valid queues - for (QueueMapping mapping : newMappings) { - String mappingQueue = mapping.getQueue(); - if (!mappingQueue - .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) - && !mappingQueue - .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = queues.get(mappingQueue); - if (queue == null || !(queue instanceof LeafQueue)) { - throw new IOException("mapping contains invalid or non-leaf queue " - + mappingQueue); + try { + readLock.lock(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized queue mappings, override: " + overrideWithQueueMappings); + + // Get new user/group mappings + List newMappings = conf.getQueueMappings(); + // check if mappings refer to valid queues + for (QueueMapping mapping : newMappings) { + String mappingQueue = mapping.getQueue(); + if (!mappingQueue.equals( + UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue + .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { + CSQueue queue = queues.get(mappingQueue); + if (queue == null || !(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mappingQueue); + } } } - } - // initialize groups if mappings are present - if (newMappings.size() > 0) { - Groups groups = new Groups(conf); - return new UserGroupMappingPlacementRule(overrideWithQueueMappings, - newMappings, groups); - } + // initialize groups if mappings are present + if (newMappings.size() > 0) { + Groups groups = new Groups(conf); + return new UserGroupMappingPlacementRule(overrideWithQueueMappings, + newMappings, groups); + } - return null; + return null; + } finally { + readLock.unlock(); + } } private void updatePlacementRules() throws IOException { @@ -527,12 +542,12 @@ public class CapacityScheduler extends } @Lock(CapacityScheduler.class) - private void reinitializeQueues(CapacitySchedulerConfiguration conf) + private void reinitializeQueues(CapacitySchedulerConfiguration newConf) throws IOException { // Parse new queues Map newQueues = new HashMap(); CSQueue newRoot = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, + parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, noop); // Ensure all existing queues are still present @@ -694,248 +709,279 @@ public class CapacityScheduler extends return queues.get(queueName); } - private synchronized void addApplicationOnRecovery( + private void addApplicationOnRecovery( ApplicationId applicationId, String queueName, String user, Priority priority) { - CSQueue queue = getQueue(queueName); - if (queue == null) { - //During a restart, this indicates a queue was removed, which is - //not presently supported - if (!YarnConfiguration.shouldRMFailFast(getConfig())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " + - queueName + " which no longer exists after restart.")); - return; - } else { - String queueErrorMsg = "Queue named " + queueName - + " missing during application recovery." - + " Queue removal during recovery is not presently supported by the" - + " capacity scheduler, please restart with all queues configured" - + " which were present before shutdown/restart."; - LOG.fatal(queueErrorMsg); - throw new QueueInvalidException(queueErrorMsg); + try { + writeLock.lock(); + CSQueue queue = getQueue(queueName); + if (queue == null) { + //During a restart, this indicates a queue was removed, which is + //not presently supported + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery as it was submitted to queue " + + queueName + " which no longer exists after restart.")); + return; + } else{ + String queueErrorMsg = "Queue named " + queueName + + " missing during application recovery." + + " Queue removal during recovery is not presently " + + "supported by the capacity scheduler, please " + + "restart with all queues configured" + + " which were present before shutdown/restart."; + LOG.fatal(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); + } } - } - if (!(queue instanceof LeafQueue)) { - // During RM restart, this means leaf queue was converted to a parent - // queue, which is not supported for running apps. - if (!YarnConfiguration.shouldRMFailFast(getConfig())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " + - queueName + " which is no longer a leaf queue after restart.")); - return; - } else { - String queueErrorMsg = "Queue named " + queueName - + " is no longer a leaf queue during application recovery." - + " Changing a leaf queue to a parent queue during recovery is" - + " not presently supported by the capacity scheduler. Please" - + " restart with leaf queues before shutdown/restart continuing" - + " as leaf queues."; - LOG.fatal(queueErrorMsg); - throw new QueueInvalidException(queueErrorMsg); + if (!(queue instanceof LeafQueue)) { + // During RM restart, this means leaf queue was converted to a parent + // queue, which is not supported for running apps. + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery as it was submitted to queue " + + queueName + + " which is no longer a leaf queue after restart.")); + return; + } else{ + String queueErrorMsg = "Queue named " + queueName + + " is no longer a leaf queue during application recovery." + + " Changing a leaf queue to a parent queue during recovery is" + + " not presently supported by the capacity scheduler. Please" + + " restart with leaf queues before shutdown/restart continuing" + + " as leaf queues."; + LOG.fatal(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); + } } - } - // Submit to the queue - try { - queue.submitApplication(applicationId, user, queueName); - } catch (AccessControlException ace) { - // Ignore the exception for recovered app as the app was previously - // accepted. - } - queue.getMetrics().submitApp(user); - SchedulerApplication application = - new SchedulerApplication(queue, user, priority); - applications.put(applicationId, application); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queueName); - if (LOG.isDebugEnabled()) { - LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + // Submit to the queue + try { + queue.submitApplication(applicationId, user, queueName); + } catch (AccessControlException ace) { + // Ignore the exception for recovered app as the app was previously + // accepted. + } + queue.getMetrics().submitApp(user); + SchedulerApplication application = + new SchedulerApplication(queue, user, priority); + applications.put(applicationId, application); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queueName); + if (LOG.isDebugEnabled()) { + LOG.debug( + applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } finally { + writeLock.unlock(); } } - private synchronized void addApplication(ApplicationId applicationId, + private void addApplication(ApplicationId applicationId, String queueName, String user, Priority priority) { - // Sanity checks. - CSQueue queue = getQueue(queueName); - if (queue == null) { - String message = "Application " + applicationId + - " submitted by user " + user + " to unknown queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return; - } - if (!(queue instanceof LeafQueue)) { - String message = "Application " + applicationId + - " submitted by user " + user + " to non-leaf queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return; - } - // Submit to the queue try { - queue.submitApplication(applicationId, user, queueName); - } catch (AccessControlException ace) { - LOG.info("Failed to submit application " + applicationId + " to queue " - + queueName + " from user " + user, ace); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, ace.toString())); - return; + writeLock.lock(); + // Sanity checks. + CSQueue queue = getQueue(queueName); + if (queue == null) { + String message = + "Application " + applicationId + " submitted by user " + user + + " to unknown queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } + if (!(queue instanceof LeafQueue)) { + String message = + "Application " + applicationId + " submitted by user " + user + + " to non-leaf queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } + // Submit to the queue + try { + queue.submitApplication(applicationId, user, queueName); + } catch (AccessControlException ace) { + LOG.info("Failed to submit application " + applicationId + " to queue " + + queueName + " from user " + user, ace); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + ace.toString())); + return; + } + // update the metrics + queue.getMetrics().submitApp(user); + SchedulerApplication application = + new SchedulerApplication(queue, user, priority); + applications.put(applicationId, application); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queueName); + rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } finally { + writeLock.unlock(); } - // update the metrics - queue.getMetrics().submitApp(user); - SchedulerApplication application = - new SchedulerApplication(queue, user, priority); - applications.put(applicationId, application); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } - private synchronized void addApplicationAttempt( + private void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication application = - applications.get(applicationAttemptId.getApplicationId()); - if (application == null) { - LOG.warn("Application " + applicationAttemptId.getApplicationId() + - " cannot be found in scheduler."); - return; - } - CSQueue queue = (CSQueue) application.getQueue(); - - FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, - application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority(), isAttemptRecovering, activitiesManager); - if (transferStateFromPreviousAttempt) { - attempt.transferStateFromPreviousAttempt( - application.getCurrentAppAttempt()); - } - application.setCurrentAppAttempt(attempt); - - // Update attempt priority to the latest to avoid race condition i.e - // SchedulerApplicationAttempt is created with old priority but it is not - // set to SchedulerApplication#setCurrentAppAttempt. - // Scenario would occur is - // 1. SchdulerApplicationAttempt is created with old priority. - // 2. updateApplicationPriority() updates SchedulerApplication. Since - // currentAttempt is null, it just return. - // 3. ScheduelerApplcationAttempt is set in - // SchedulerApplication#setCurrentAppAttempt. - attempt.setPriority(application.getPriority()); - - queue.submitApplicationAttempt(attempt, application.getUser()); - LOG.info("Added Application Attempt " + applicationAttemptId - + " to scheduler from user " + application.getUser() + " in queue " - + queue.getQueueName()); - if (isAttemptRecovering) { - if (LOG.isDebugEnabled()) { - LOG.debug(applicationAttemptId - + " is recovering. Skipping notifying ATTEMPT_ADDED"); + try { + writeLock.lock(); + SchedulerApplication application = applications.get( + applicationAttemptId.getApplicationId()); + if (application == null) { + LOG.warn("Application " + applicationAttemptId.getApplicationId() + + " cannot be found in scheduler."); + return; } - } else { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + CSQueue queue = (CSQueue) application.getQueue(); + + FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, + application.getUser(), queue, queue.getActiveUsersManager(), + rmContext, application.getPriority(), isAttemptRecovering, + activitiesManager); + if (transferStateFromPreviousAttempt) { + attempt.transferStateFromPreviousAttempt( + application.getCurrentAppAttempt()); + } + application.setCurrentAppAttempt(attempt); + + // Update attempt priority to the latest to avoid race condition i.e + // SchedulerApplicationAttempt is created with old priority but it is not + // set to SchedulerApplication#setCurrentAppAttempt. + // Scenario would occur is + // 1. SchdulerApplicationAttempt is created with old priority. + // 2. updateApplicationPriority() updates SchedulerApplication. Since + // currentAttempt is null, it just return. + // 3. ScheduelerApplcationAttempt is set in + // SchedulerApplication#setCurrentAppAttempt. + attempt.setPriority(application.getPriority()); + + queue.submitApplicationAttempt(attempt, application.getUser()); + LOG.info("Added Application Attempt " + applicationAttemptId + + " to scheduler from user " + application.getUser() + " in queue " + + queue.getQueueName()); + if (isAttemptRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); + } + } else{ + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } + } finally { + writeLock.unlock(); } } - private synchronized void doneApplication(ApplicationId applicationId, + private void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = - applications.get(applicationId); - if (application == null){ - // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, - // ignore it. - LOG.warn("Couldn't find application " + applicationId); - return; - } - CSQueue queue = (CSQueue) application.getQueue(); - if (!(queue instanceof LeafQueue)) { - LOG.error("Cannot finish application " + "from non-leaf queue: " - + queue.getQueueName()); - } else { - queue.finishApplication(applicationId, application.getUser()); + try { + writeLock.lock(); + SchedulerApplication application = applications.get( + applicationId); + if (application == null) { + // The AppRemovedSchedulerEvent maybe sent on recovery for completed + // apps, ignore it. + LOG.warn("Couldn't find application " + applicationId); + return; + } + CSQueue queue = (CSQueue) application.getQueue(); + if (!(queue instanceof LeafQueue)) { + LOG.error("Cannot finish application " + "from non-leaf queue: " + queue + .getQueueName()); + } else{ + queue.finishApplication(applicationId, application.getUser()); + } + application.stop(finalState); + applications.remove(applicationId); + } finally { + writeLock.unlock(); } - application.stop(finalState); - applications.remove(applicationId); } - private synchronized void doneApplicationAttempt( + private void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { - LOG.info("Application Attempt " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); - - FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication application = - applications.get(applicationAttemptId.getApplicationId()); + try { + writeLock.lock(); + LOG.info("Application Attempt " + applicationAttemptId + " is done." + + " finalState=" + rmAppAttemptFinalState); - if (application == null || attempt == null) { - LOG.info("Unknown application " + applicationAttemptId + " has completed!"); - return; - } + FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); + SchedulerApplication application = applications.get( + applicationAttemptId.getApplicationId()); - // Release all the allocated, acquired, running containers - for (RMContainer rmContainer : attempt.getLiveContainers()) { - if (keepContainers - && rmContainer.getState().equals(RMContainerState.RUNNING)) { - // do not kill the running container in the case of work-preserving AM - // restart. - LOG.info("Skip killing " + rmContainer.getContainerId()); - continue; + if (application == null || attempt == null) { + LOG.info( + "Unknown application " + applicationAttemptId + " has completed!"); + return; } - super.completedContainer( - rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); - } - // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer( - rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), "Application Complete"), - RMContainerEventType.KILL); - } + // Release all the allocated, acquired, running containers + for (RMContainer rmContainer : attempt.getLiveContainers()) { + if (keepContainers && rmContainer.getState().equals( + RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + LOG.info("Skip killing " + rmContainer.getContainerId()); + continue; + } + super.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); + } - // Clean up pending requests, metrics etc. - attempt.stop(rmAppAttemptFinalState); + // Release all reserved containers + for (RMContainer rmContainer : attempt.getReservedContainers()) { + super.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + "Application Complete"), RMContainerEventType.KILL); + } - // Inform the queue - String queueName = attempt.getQueue().getQueueName(); - CSQueue queue = queues.get(queueName); - if (!(queue instanceof LeafQueue)) { - LOG.error("Cannot finish application " + "from non-leaf queue: " - + queueName); - } else { - queue.finishApplicationAttempt(attempt, queue.getQueueName()); + // Clean up pending requests, metrics etc. + attempt.stop(rmAppAttemptFinalState); + + // Inform the queue + String queueName = attempt.getQueue().getQueueName(); + CSQueue queue = queues.get(queueName); + if (!(queue instanceof LeafQueue)) { + LOG.error( + "Cannot finish application " + "from non-leaf queue: " + queueName); + } else{ + queue.finishApplicationAttempt(attempt, queue.getQueueName()); + } + } finally { + writeLock.unlock(); } } - // It is crucial to acquire leaf queue lock first to prevent: - // 1. Race condition when calculating the delta resource in - // SchedContainerChangeRequest - // 2. Deadlock with the scheduling thread. private LeafQueue updateIncreaseRequests( - List increaseRequests, - FiCaSchedulerApp app) { + List increaseRequests, FiCaSchedulerApp app) { if (null == increaseRequests || increaseRequests.isEmpty()) { return null; } + // Pre-process increase requests List schedIncreaseRequests = createSchedContainerChangeRequests(increaseRequests, true); LeafQueue leafQueue = (LeafQueue) app.getQueue(); - synchronized(leafQueue) { + + try { + /* + * Acquire application's lock here to make sure application won't + * finish when updateIncreaseRequest is called. + */ + app.getWriteLock().lock(); // make sure we aren't stopping/removing the application // when the allocate comes in if (app.isStopped()) { @@ -945,8 +991,12 @@ public class CapacityScheduler extends if (app.updateIncreaseRequests(schedIncreaseRequests)) { return leafQueue; } - return null; + } finally { + app.getWriteLock().unlock(); } + + + return null; } @Override @@ -956,7 +1006,6 @@ public class CapacityScheduler extends List blacklistAdditions, List blacklistRemovals, List increaseRequests, List decreaseRequests) { - FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { return EMPTY_ALLOCATION; @@ -966,42 +1015,43 @@ public class CapacityScheduler extends releaseContainers(release, application); // update increase requests - LeafQueue updateDemandForQueue = - updateIncreaseRequests(increaseRequests, application); + LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests, + application); // Decrease containers decreaseContainers(decreaseRequests, application); // Sanity check for new allocation requests - SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), getMaximumResourceCapability()); + SchedulerUtils.normalizeRequests(ask, getResourceCalculator(), + getClusterResource(), getMinimumResourceCapability(), + getMaximumResourceCapability()); Allocation allocation; - synchronized (application) { - - // make sure we aren't stopping/removing the application - // when the allocate comes in + // make sure we aren't stopping/removing the application + // when the allocate comes in + try { + application.getWriteLock().lock(); if (application.isStopped()) { return EMPTY_ALLOCATION; } // Process resource requests if (!ask.isEmpty()) { - if(LOG.isDebugEnabled()) { - LOG.debug("allocate: pre-update " + applicationAttemptId + - " ask size =" + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "allocate: pre-update " + applicationAttemptId + " ask size =" + + ask.size()); application.showRequests(); } // Update application requests - if (application.updateResourceRequests(ask) - && (updateDemandForQueue == null)) { + if (application.updateResourceRequests(ask) && (updateDemandForQueue + == null)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("allocate: post-update"); application.showRequests(); } @@ -1011,6 +1061,8 @@ public class CapacityScheduler extends allocation = application.getAllocation(getResourceCalculator(), getClusterResource(), getMinimumResourceCapability()); + } finally { + application.getWriteLock().unlock(); } if (updateDemandForQueue != null && !application @@ -1019,7 +1071,6 @@ public class CapacityScheduler extends } return allocation; - } @Override @@ -1049,142 +1100,159 @@ public class CapacityScheduler extends return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " clusterResources: " + getClusterResource()); - } + private void nodeUpdate(RMNode nm) { + try { + writeLock.lock(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "nodeUpdate: " + nm + " clusterResources: " + getClusterResource()); + } - Resource releaseResources = Resource.newInstance(0, 0); + Resource releaseResources = Resource.newInstance(0, 0); - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); + FiCaSchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for (UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll( + containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + // Process completed containers + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + RMContainer container = getRMContainer(containerId); + super.completedContainer(container, completedContainer, + RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } } } - } - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance( + getSchedulerNode(nm.getNodeID()).getAllocatedResource(), + 0))); + } + schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, + releaseResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); - // Now node data structures are upto date and ready for scheduling. - if(LOG.isDebugEnabled()) { - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + // Now node data structures are upto date and ready for scheduling. + if (LOG.isDebugEnabled()) { + LOG.debug( + "Node being looked for scheduling " + nm + " availableResource: " + + node.getUnallocatedResource()); + } + } finally { + writeLock.unlock(); } } /** * Process resource update on a node. */ - private synchronized void updateNodeAndQueueResource(RMNode nm, + private void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { - updateNodeResource(nm, resourceOption); - Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); + try { + writeLock.lock(); + updateNodeResource(nm, resourceOption); + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + } finally { + writeLock.unlock(); + } } /** * Process node labels update on a node. */ - private synchronized void updateLabelsOnNode(NodeId nodeId, + private void updateLabelsOnNode(NodeId nodeId, Set newLabels) { - FiCaSchedulerNode node = nodeTracker.getNode(nodeId); - if (null == node) { - return; - } - - // Get new partition, we have only one partition per node - String newPartition; - if (newLabels.isEmpty()) { - newPartition = RMNodeLabelsManager.NO_LABEL; - } else { - newPartition = newLabels.iterator().next(); - } + try { + writeLock.lock(); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); + if (null == node) { + return; + } - // old partition as well - String oldPartition = node.getPartition(); + // Get new partition, we have only one partition per node + String newPartition; + if (newLabels.isEmpty()) { + newPartition = RMNodeLabelsManager.NO_LABEL; + } else{ + newPartition = newLabels.iterator().next(); + } - // Update resources of these containers - for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { - FiCaSchedulerApp application = - getApplicationAttempt(rmContainer.getApplicationAttemptId()); - if (null != application) { - application.nodePartitionUpdated(rmContainer, oldPartition, - newPartition); - } else { - LOG.warn("There's something wrong, some RMContainers running on" - + " a node, but we cannot find SchedulerApplicationAttempt for it. Node=" - + node.getNodeID() + " applicationAttemptId=" - + rmContainer.getApplicationAttemptId()); - continue; + // old partition as well + String oldPartition = node.getPartition(); + + // Update resources of these containers + for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { + FiCaSchedulerApp application = getApplicationAttempt( + rmContainer.getApplicationAttemptId()); + if (null != application) { + application.nodePartitionUpdated(rmContainer, oldPartition, + newPartition); + } else{ + LOG.warn("There's something wrong, some RMContainers running on" + + " a node, but we cannot find SchedulerApplicationAttempt " + + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" + + rmContainer.getApplicationAttemptId()); + continue; + } } + + // Unreserve container on this node + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + killReservedContainer(reservedContainer); + } + + // Update node labels after we've done this + node.updateLabels(newLabels); + } finally { + writeLock.unlock(); } - - // Unreserve container on this node - RMContainer reservedContainer = node.getReservedContainer(); - if (null != reservedContainer) { - killReservedContainer(reservedContainer); - } - - // Update node labels after we've done this - node.updateLabels(newLabels); } private void updateSchedulerHealth(long now, FiCaSchedulerNode node, @@ -1219,134 +1287,134 @@ public class CapacityScheduler extends } @VisibleForTesting - public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } + public void allocateContainersToNode(FiCaSchedulerNode node) { + try { + writeLock.lock(); + if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext + .isSchedulerReadyForAllocatingContainers()) { + return; + } - if (!nodeTracker.exists(node.getNodeID())) { - LOG.info("Skipping scheduling as the node " + node.getNodeID() + - " has been removed"); - return; - } + if (!nodeTracker.exists(node.getNodeID())) { + LOG.info("Skipping scheduling as the node " + node.getNodeID() + + " has been removed"); + return; + } - // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, - new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); + // reset allocation and reservation stats before we start doing any work + updateSchedulerHealth(lastNodeUpdateTime, node, + new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); - CSAssignment assignment; + CSAssignment assignment; - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { - FiCaSchedulerApp reservedApplication = - getCurrentAttemptForContainer(reservedContainer.getContainerId()); + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( + reservedContainer.getContainerId()); - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " - + reservedApplication.getApplicationId() + " on node: " - + node.getNodeID()); + // Try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedApplication.getApplicationId() + " on node: " + node + .getNodeID()); - LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); - assignment = - queue.assignContainers( - getClusterResource(), - node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, getClusterResource())), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (assignment.isFulfilledReservation()) { - CSAssignment tmp = - new CSAssignment(reservedContainer.getReservedResource(), - assignment.getType()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - reservedContainer.getReservedResource()); - tmp.getAssignmentInformation().addAllocationDetails( - reservedContainer.getContainerId(), queue.getQueuePath()); - tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); - schedulerHealth.updateSchedulerFulfilledReservationCounts(1); - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), - AllocationState.ALLOCATED_FROM_RESERVED); - } else { - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), AllocationState.SKIPPED); + LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); + assignment = queue.assignContainers(getClusterResource(), node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + if (assignment.isFulfilledReservation()) { + CSAssignment tmp = new CSAssignment( + reservedContainer.getReservedResource(), assignment.getType()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + reservedContainer.getReservedResource()); + tmp.getAssignmentInformation().addAllocationDetails( + reservedContainer.getContainerId(), queue.getQueuePath()); + tmp.getAssignmentInformation().incrAllocations(); + updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } else{ + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.SKIPPED); + } } - } - // Try to schedule more if there are no reservations to fulfill - if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(Resources - .add(node.getUnallocatedResource(), node.getTotalKillableResources()), - minimumAllocation) > 0) { + // Try to schedule more if there are no reservations to fulfill + if (node.getReservedContainer() == null) { + if (calculator.computeAvailableContainers(Resources + .add(node.getUnallocatedResource(), + node.getTotalKillableResources()), minimumAllocation) > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to schedule on node: " + node.getNodeName() + - ", available: " + node.getUnallocatedResource()); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to schedule on node: " + node.getNodeName() + + ", available: " + node.getUnallocatedResource()); + } - assignment = root.assignContainers( - getClusterResource(), - node, - new ResourceLimits(labelManager.getResourceByLabel( - node.getPartition(), getClusterResource())), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (Resources.greaterThan(calculator, getClusterResource(), - assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); - return; - } - - // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(node.getPartition(), - RMNodeLabelsManager.NO_LABEL)) { - return; - } - - // Only do non-exclusive allocation when the node-label supports that - try { - if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - node.getPartition())) { + assignment = root.assignContainers(getClusterResource(), node, + new ResourceLimits(labelManager + .getResourceByLabel(node.getPartition(), + getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + if (Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); return; } - } catch (IOException e) { - LOG.warn("Exception when trying to get exclusivity of node label=" - + node.getPartition(), e); - return; + + // Only do non-exclusive allocation when node has node-labels. + if (StringUtils.equals(node.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { + return; + } + + // Only do non-exclusive allocation when the node-label supports that + try { + if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( + node.getPartition())) { + return; + } + } catch (IOException e) { + LOG.warn( + "Exception when trying to get exclusivity of node label=" + node + .getPartition(), e); + return; + } + + // Try to use NON_EXCLUSIVE + assignment = root.assignContainers(getClusterResource(), node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + getClusterResource())), + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); } - - // Try to use NON_EXCLUSIVE - assignment = root.assignContainers( - getClusterResource(), - node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, getClusterResource())), - SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + } else{ + LOG.info("Skipping scheduling since node " + node.getNodeID() + + " is reserved by application " + node.getReservedContainer() + .getContainerId().getApplicationAttemptId()); } - } else { - LOG.info("Skipping scheduling since node " - + node.getNodeID() - + " is reserved by application " - + node.getReservedContainer().getContainerId() - .getApplicationAttemptId()); + } finally { + writeLock.unlock(); } } @@ -1499,100 +1567,108 @@ public class CapacityScheduler extends } } - private synchronized void addNode(RMNode nodeManager) { - FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, - usePortForNodeName, nodeManager.getNodeLabels()); - nodeTracker.addNode(schedulerNode); - - // update this node to node label manager - if (labelManager != null) { - labelManager.activateNode(nodeManager.getNodeID(), - schedulerNode.getTotalResource()); - } + private void addNode(RMNode nodeManager) { + try { + writeLock.lock(); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName, nodeManager.getNodeLabels()); + nodeTracker.addNode(schedulerNode); + + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + schedulerNode.getTotalResource()); + } - Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); - LOG.info("Added node " + nodeManager.getNodeAddress() + - " clusterResource: " + clusterResource); + LOG.info( + "Added node " + nodeManager.getNodeAddress() + " clusterResource: " + + clusterResource); - if (scheduleAsynchronously && getNumClusterNodes() == 1) { - asyncSchedulerThread.beginSchedule(); + if (scheduleAsynchronously && getNumClusterNodes() == 1) { + asyncSchedulerThread.beginSchedule(); + } + } finally { + writeLock.unlock(); } } - private synchronized void removeNode(RMNode nodeInfo) { - // update this node to node label manager - if (labelManager != null) { - labelManager.deactivateNode(nodeInfo.getNodeID()); - } + private void removeNode(RMNode nodeInfo) { + try { + writeLock.lock(); + // update this node to node label manager + if (labelManager != null) { + labelManager.deactivateNode(nodeInfo.getNodeID()); + } - NodeId nodeId = nodeInfo.getNodeID(); - FiCaSchedulerNode node = nodeTracker.getNode(nodeId); - if (node == null) { - LOG.error("Attempting to remove non-existent node " + nodeId); - return; - } + NodeId nodeId = nodeInfo.getNodeID(); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); + if (node == null) { + LOG.error("Attempting to remove non-existent node " + nodeId); + return; + } - // Remove running containers - List runningContainers = node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } - - // Remove reservations, if any - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - super.completedContainer(reservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } + // Remove running containers + List runningContainers = + node.getCopiedListOfRunningContainers(); + for (RMContainer container : runningContainers) { + super.completedContainer(container, SchedulerUtils + .createAbnormalContainerStatus(container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + } - nodeTracker.removeNode(nodeId); - Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); - int numNodes = nodeTracker.nodeCount(); + // Remove reservations, if any + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + super.completedContainer(reservedContainer, SchedulerUtils + .createAbnormalContainerStatus(reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + } - if (scheduleAsynchronously && numNodes == 0) { - asyncSchedulerThread.suspendSchedule(); - } + nodeTracker.removeNode(nodeId); + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + int numNodes = nodeTracker.nodeCount(); + + if (scheduleAsynchronously && numNodes == 0) { + asyncSchedulerThread.suspendSchedule(); + } - LOG.info("Removed node " + nodeInfo.getNodeAddress() + - " clusterResource: " + getClusterResource()); + LOG.info( + "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + + getClusterResource()); + } finally { + writeLock.unlock(); + } } private void rollbackContainerResource( ContainerId containerId) { RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { - LOG.info("Cannot rollback resource for container " + containerId + - ". The container does not exist."); + LOG.info("Cannot rollback resource for container " + containerId + + ". The container does not exist."); return; } FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); if (application == null) { - LOG.info("Cannot rollback resource for container " + containerId + - ". The application that the container belongs to does not exist."); + LOG.info("Cannot rollback resource for container " + containerId + + ". The application that the container " + + "belongs to does not exist."); return; } LOG.info("Roll back resource for container " + containerId); - LeafQueue leafQueue = (LeafQueue) application.getQueue(); - synchronized(leafQueue) { - SchedulerNode schedulerNode = - getSchedulerNode(rmContainer.getAllocatedNode()); - SchedContainerChangeRequest decreaseRequest = - new SchedContainerChangeRequest(this.rmContext, schedulerNode, - rmContainer, rmContainer.getLastConfirmedResource()); - decreaseContainer(decreaseRequest, application); - } + + SchedulerNode schedulerNode = getSchedulerNode( + rmContainer.getAllocatedNode()); + SchedContainerChangeRequest decreaseRequest = + new SchedContainerChangeRequest(this.rmContext, schedulerNode, + rmContainer, rmContainer.getLastConfirmedResource()); + decreaseContainer(decreaseRequest, application); } @Override @@ -1601,23 +1677,29 @@ public class CapacityScheduler extends RMContainerEventType event) { Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); - + // Get the application for the finished container - FiCaSchedulerApp application = - getCurrentAttemptForContainer(container.getId()); + FiCaSchedulerApp application = getCurrentAttemptForContainer( + container.getId()); ApplicationId appId = containerId.getApplicationAttemptId().getApplicationId(); if (application == null) { - LOG.info("Container " + container + " of" + " finished application " - + appId + " completed with event " + event); + LOG.info( + "Container " + container + " of" + " finished application " + appId + + " completed with event " + event); return; } - + // Get the node on which the container was allocated FiCaSchedulerNode node = getNode(container.getNodeId()); - + if (null == node) { + LOG.info("Container " + container + " of" + " removed node " + container + .getNodeId() + " completed with event " + event); + return; + } + // Inform the queue - LeafQueue queue = (LeafQueue)application.getQueue(); + LeafQueue queue = (LeafQueue) application.getQueue(); queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); } @@ -1628,19 +1710,19 @@ public class CapacityScheduler extends RMContainer rmContainer = decreaseRequest.getRMContainer(); // Check container status before doing decrease if (rmContainer.getState() != RMContainerState.RUNNING) { - LOG.info("Trying to decrease a container not in RUNNING state, container=" - + rmContainer + " state=" + rmContainer.getState().name()); + LOG.info( + "Trying to decrease a container not in RUNNING state, container=" + + rmContainer + " state=" + rmContainer.getState().name()); return; } - FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; + FiCaSchedulerApp app = (FiCaSchedulerApp) attempt; LeafQueue queue = (LeafQueue) attempt.getQueue(); try { queue.decreaseContainer(getClusterResource(), decreaseRequest, app); // Notify RMNode that the container can be pulled by NodeManager in the // next heartbeat - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeDecreaseContainerEvent( - decreaseRequest.getNodeId(), + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(), Collections.singletonList(rmContainer.getContainer()))); } catch (InvalidResourceRequestException e) { LOG.warn("Error happens when checking decrease request, Ignoring.." @@ -1701,70 +1783,81 @@ public class CapacityScheduler extends } } - public synchronized void markContainerForKillable( + public void markContainerForKillable( RMContainer killableContainer) { - if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" - + killableContainer.toString()); + try { + writeLock.lock(); + if (LOG.isDebugEnabled()) { + LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" + + killableContainer.toString()); + } + + if (!isLazyPreemptionEnabled) { + super.completedContainer(killableContainer, SchedulerUtils + .createPreemptedContainerStatus(killableContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); + } else{ + FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( + killableContainer.getAllocatedNode()); + + FiCaSchedulerApp application = getCurrentAttemptForContainer( + killableContainer.getContainerId()); + + node.markContainerToKillable(killableContainer.getContainerId()); + + // notify PreemptionManager + // Get the application for the finished container + if (null != application) { + String leafQueueName = application.getCSLeafQueue().getQueueName(); + getPreemptionManager().addKillableContainer( + new KillableContainer(killableContainer, node.getPartition(), + leafQueueName)); + } + } + } finally { + writeLock.unlock(); } + } + + private void markContainerForNonKillable( + RMContainer nonKillableContainer) { + try { + writeLock.lock(); + if (LOG.isDebugEnabled()) { + LOG.debug( + SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" + + nonKillableContainer.toString()); + } - if (!isLazyPreemptionEnabled) { - super.completedContainer(killableContainer, SchedulerUtils - .createPreemptedContainerStatus(killableContainer.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); - } else { FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( - killableContainer.getAllocatedNode()); + nonKillableContainer.getAllocatedNode()); FiCaSchedulerApp application = getCurrentAttemptForContainer( - killableContainer.getContainerId()); + nonKillableContainer.getContainerId()); - node.markContainerToKillable(killableContainer.getContainerId()); + node.markContainerToNonKillable(nonKillableContainer.getContainerId()); // notify PreemptionManager // Get the application for the finished container if (null != application) { String leafQueueName = application.getCSLeafQueue().getQueueName(); - getPreemptionManager().addKillableContainer( - new KillableContainer(killableContainer, node.getPartition(), + getPreemptionManager().removeKillableContainer( + new KillableContainer(nonKillableContainer, node.getPartition(), leafQueueName)); - } } - } - - private synchronized void markContainerForNonKillable( - RMContainer nonKillableContainer) { - if (LOG.isDebugEnabled()) { - LOG.debug( - SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" - + nonKillableContainer.toString()); - } - - FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( - nonKillableContainer.getAllocatedNode()); - - FiCaSchedulerApp application = getCurrentAttemptForContainer( - nonKillableContainer.getContainerId()); - - node.markContainerToNonKillable(nonKillableContainer.getContainerId()); - - // notify PreemptionManager - // Get the application for the finished container - if (null != application) { - String leafQueueName = application.getCSLeafQueue().getQueueName(); - getPreemptionManager().removeKillableContainer( - new KillableContainer(nonKillableContainer, node.getPartition(), - leafQueueName)); + } + } finally { + writeLock.unlock(); } } @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, + public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { CSQueue queue = getQueue(queueName); if (queue == null) { if (LOG.isDebugEnabled()) { - LOG.debug("ACL not found for queue access-type " + acl - + " for queue " + queueName); + LOG.debug("ACL not found for queue access-type " + acl + " for queue " + + queueName); } return false; } @@ -1803,179 +1896,213 @@ public class CapacityScheduler extends return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; } - private synchronized String resolveReservationQueueName(String queueName, + private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { - CSQueue queue = getQueue(queueName); - // Check if the queue is a plan queue - if ((queue == null) || !(queue instanceof PlanQueue)) { - return queueName; - } - if (reservationID != null) { - String resQName = reservationID.toString(); - queue = getQueue(resQName); - if (queue == null) { - // reservation has terminated during failover - if (isRecovering - && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) { - // move to the default child queue of the plan - return getDefaultReservationQueueName(queueName); - } - String message = - "Application " + applicationId - + " submitted to a reservation which is not currently active: " - + resQName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return null; + try { + readLock.lock(); + CSQueue queue = getQueue(queueName); + // Check if the queue is a plan queue + if ((queue == null) || !(queue instanceof PlanQueue)) { + return queueName; } - if (!queue.getParent().getQueueName().equals(queueName)) { - String message = - "Application: " + applicationId + " submitted to a reservation " - + resQName + " which does not belong to the specified queue: " - + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return null; + if (reservationID != null) { + String resQName = reservationID.toString(); + queue = getQueue(resQName); + if (queue == null) { + // reservation has terminated during failover + if (isRecovering && conf.getMoveOnExpiry( + getQueue(queueName).getQueuePath())) { + // move to the default child queue of the plan + return getDefaultReservationQueueName(queueName); + } + String message = "Application " + applicationId + + " submitted to a reservation which is not currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return null; + } + if (!queue.getParent().getQueueName().equals(queueName)) { + String message = + "Application: " + applicationId + " submitted to a reservation " + + resQName + " which does not belong to the specified queue: " + + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return null; + } + // use the reservation queue to run the app + queueName = resQName; + } else{ + // use the default child queue of the plan for unreserved apps + queueName = getDefaultReservationQueueName(queueName); } - // use the reservation queue to run the app - queueName = resQName; - } else { - // use the default child queue of the plan for unreserved apps - queueName = getDefaultReservationQueueName(queueName); + return queueName; + } finally { + readLock.unlock(); } - return queueName; + } @Override - public synchronized void removeQueue(String queueName) + public void removeQueue(String queueName) throws SchedulerDynamicEditException { - LOG.info("Removing queue: " + queueName); - CSQueue q = this.getQueue(queueName); - if (!(q instanceof ReservationQueue)) { - throw new SchedulerDynamicEditException("The queue that we are asked " - + "to remove (" + queueName + ") is not a ReservationQueue"); - } - ReservationQueue disposableLeafQueue = (ReservationQueue) q; - // at this point we should have no more apps - if (disposableLeafQueue.getNumApplications() > 0) { - throw new SchedulerDynamicEditException("The queue " + queueName - + " is not empty " + disposableLeafQueue.getApplications().size() - + " active apps " + disposableLeafQueue.getPendingApplications().size() - + " pending apps"); - } + try { + writeLock.lock(); + LOG.info("Removing queue: " + queueName); + CSQueue q = this.getQueue(queueName); + if (!(q instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException( + "The queue that we are asked " + "to remove (" + queueName + + ") is not a ReservationQueue"); + } + ReservationQueue disposableLeafQueue = (ReservationQueue) q; + // at this point we should have no more apps + if (disposableLeafQueue.getNumApplications() > 0) { + throw new SchedulerDynamicEditException( + "The queue " + queueName + " is not empty " + disposableLeafQueue + .getApplications().size() + " active apps " + + disposableLeafQueue.getPendingApplications().size() + + " pending apps"); + } - ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); - this.queues.remove(queueName); - LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); + ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + this.queues.remove(queueName); + LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); + } finally { + writeLock.unlock(); + } } @Override - public synchronized void addQueue(Queue queue) + public void addQueue(Queue queue) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException( + "Queue " + queue.getQueueName() + " is not a ReservationQueue"); + } - if (!(queue instanceof ReservationQueue)) { - throw new SchedulerDynamicEditException("Queue " + queue.getQueueName() - + " is not a ReservationQueue"); - } + ReservationQueue newQueue = (ReservationQueue) queue; - ReservationQueue newQueue = (ReservationQueue) queue; + if (newQueue.getParent() == null || !(newQueue + .getParent() instanceof PlanQueue)) { + throw new SchedulerDynamicEditException( + "ParentQueue for " + newQueue.getQueueName() + + " is not properly set (should be set and be a PlanQueue)"); + } - if (newQueue.getParent() == null - || !(newQueue.getParent() instanceof PlanQueue)) { - throw new SchedulerDynamicEditException("ParentQueue for " - + newQueue.getQueueName() - + " is not properly set (should be set and be a PlanQueue)"); + PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); + String queuename = newQueue.getQueueName(); + parentPlan.addChildQueue(newQueue); + this.queues.put(queuename, newQueue); + LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + } finally { + writeLock.unlock(); } - - PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); - String queuename = newQueue.getQueueName(); - parentPlan.addChildQueue(newQueue); - this.queues.put(queuename, newQueue); - LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); } @Override - public synchronized void setEntitlement(String inQueue, - QueueEntitlement entitlement) throws SchedulerDynamicEditException, - YarnException { - LeafQueue queue = getAndCheckLeafQueue(inQueue); - ParentQueue parent = (ParentQueue) queue.getParent(); - - if (!(queue instanceof ReservationQueue)) { - throw new SchedulerDynamicEditException("Entitlement can not be" - + " modified dynamically since queue " + inQueue - + " is not a ReservationQueue"); - } + public void setEntitlement(String inQueue, QueueEntitlement entitlement) + throws YarnException { + try { + writeLock.lock(); + LeafQueue queue = getAndCheckLeafQueue(inQueue); + ParentQueue parent = (ParentQueue) queue.getParent(); + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException( + "Entitlement can not be" + " modified dynamically since queue " + + inQueue + " is not a ReservationQueue"); + } - if (!(parent instanceof PlanQueue)) { - throw new SchedulerDynamicEditException("The parent of ReservationQueue " - + inQueue + " must be an PlanQueue"); - } + if (!(parent instanceof PlanQueue)) { + throw new SchedulerDynamicEditException( + "The parent of ReservationQueue " + inQueue + + " must be an PlanQueue"); + } - ReservationQueue newQueue = (ReservationQueue) queue; + ReservationQueue newQueue = (ReservationQueue) queue; - float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); - float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity(); + float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); + float newChildCap = + sumChilds - queue.getCapacity() + entitlement.getCapacity(); - if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { - // note: epsilon checks here are not ok, as the epsilons might accumulate - // and become a problem in aggregate - if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 - && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { - return; + if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { + // note: epsilon checks here are not ok, as the epsilons might + // accumulate and become a problem in aggregate + if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 + && Math.abs( + entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { + return; + } + newQueue.setEntitlement(entitlement); + } else{ + throw new SchedulerDynamicEditException( + "Sum of child queues would exceed 100% for PlanQueue: " + parent + .getQueueName()); } - newQueue.setEntitlement(entitlement); - } else { - throw new SchedulerDynamicEditException( - "Sum of child queues would exceed 100% for PlanQueue: " - + parent.getQueueName()); + LOG.info( + "Set entitlement for ReservationQueue " + inQueue + " to " + queue + .getCapacity() + " request was (" + entitlement.getCapacity() + + ")"); + } finally { + writeLock.unlock(); } - LOG.info("Set entitlement for ReservationQueue " + inQueue + " to " - + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")"); } @Override - public synchronized String moveApplication(ApplicationId appId, + public String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { - FiCaSchedulerApp app = - getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); - String sourceQueueName = app.getQueue().getQueueName(); - LeafQueue source = getAndCheckLeafQueue(sourceQueueName); - String destQueueName = handleMoveToPlanQueue(targetQueueName); - LeafQueue dest = getAndCheckLeafQueue(destQueueName); - // Validation check - ACLs, submission limits for user & queue - String user = app.getUser(); - checkQueuePartition(app, dest); try { - dest.submitApplication(appId, user, destQueueName); - } catch (AccessControlException e) { - throw new YarnException(e); - } - // Move all live containers - for (RMContainer rmContainer : app.getLiveContainers()) { - source.detachContainer(getClusterResource(), app, rmContainer); - // attach the Container to another queue - dest.attachContainer(getClusterResource(), app, rmContainer); - } - // Detach the application.. - source.finishApplicationAttempt(app, sourceQueueName); - source.getParent().finishApplication(appId, app.getUser()); - // Finish app & update metrics - app.move(dest); - // Submit to a new queue - dest.submitApplicationAttempt(app, user); - applications.get(appId).setQueue(dest); - LOG.info("App: " + app.getApplicationId() + " successfully moved from " - + sourceQueueName + " to: " + destQueueName); - return targetQueueName; + writeLock.lock(); + FiCaSchedulerApp app = getApplicationAttempt( + ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + LeafQueue source = getAndCheckLeafQueue(sourceQueueName); + String destQueueName = handleMoveToPlanQueue(targetQueueName); + LeafQueue dest = getAndCheckLeafQueue(destQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + checkQueuePartition(app, dest); + try { + dest.submitApplication(appId, user, destQueueName); + } catch (AccessControlException e) { + throw new YarnException(e); + } + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(getClusterResource(), app, rmContainer); + // attach the Container to another queue + dest.attachContainer(getClusterResource(), app, rmContainer); + } + // Detach the application.. + source.finishApplicationAttempt(app, sourceQueueName); + source.getParent().finishApplication(appId, app.getUser()); + // Finish app & update metrics + app.move(dest); + // Submit to a new queue + dest.submitApplicationAttempt(app, user); + applications.get(appId).setQueue(dest); + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueueName + " to: " + destQueueName); + return targetQueueName; + } finally { + writeLock.unlock(); + } } /* * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked + * + * @param app + * @param dest + * @throws YarnException */ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) throws YarnException { @@ -2163,16 +2290,8 @@ public class CapacityScheduler extends // As we use iterator over a TreeSet for OrderingPolicy, once we change // priority then reinsert back to make order correct. LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); - synchronized (queue) { - queue.getOrderingPolicy().removeSchedulableEntity( - application.getCurrentAppAttempt()); - // Update new priority in SchedulerApplication - application.setPriority(appPriority); - - queu --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org