hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [2/3] hadoop git commit: YARN-3139. Improve locks in AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan
Date Thu, 06 Oct 2016 15:01:06 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5696c71..10df751 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -39,7 +39,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -268,8 +267,7 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public synchronized RMContainerTokenSecretManager 
-  getContainerTokenSecretManager() {
+  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
     return this.rmContext.getContainerTokenSecretManager();
   }
 
@@ -294,52 +292,62 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public synchronized RMContext getRMContext() {
+  public RMContext getRMContext() {
     return this.rmContext;
   }
 
   @Override
-  public synchronized void setRMContext(RMContext rmContext) {
+  public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
 
-  private synchronized void initScheduler(Configuration configuration) throws
+  private void initScheduler(Configuration configuration) throws
       IOException {
-    this.conf = loadCapacitySchedulerConfiguration(configuration);
-    validateConf(this.conf);
-    this.minimumAllocation = this.conf.getMinimumAllocation();
-    initMaximumResourceCapability(this.conf.getMaximumAllocation());
-    this.calculator = this.conf.getResourceCalculator();
-    this.usePortForNodeName = this.conf.getUsePortForNodeName();
-    this.applications = new ConcurrentHashMap<>();
-    this.labelManager = rmContext.getNodeLabelManager();
-    authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
-    this.activitiesManager = new ActivitiesManager(rmContext);
-    activitiesManager.init(conf);
-    initializeQueues(this.conf);
-    this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
-
-    scheduleAsynchronously = this.conf.getScheduleAynschronously();
-    asyncScheduleInterval =
-        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
-            DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-    if (scheduleAsynchronously) {
-      asyncSchedulerThread = new AsyncScheduleThread(this);
-    }
-
-    LOG.info("Initialized CapacityScheduler with " +
-        "calculator=" + getResourceCalculator().getClass() + ", " +
-        "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-        "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
-        "asynchronousScheduling=" + scheduleAsynchronously + ", " +
-        "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-  }
-
-  private synchronized void startSchedulerThreads() {
-    if (scheduleAsynchronously) {
-      Preconditions.checkNotNull(asyncSchedulerThread,
-          "asyncSchedulerThread is null");
-      asyncSchedulerThread.start();
+    try {
+      writeLock.lock();
+      this.conf = loadCapacitySchedulerConfiguration(configuration);
+      validateConf(this.conf);
+      this.minimumAllocation = this.conf.getMinimumAllocation();
+      initMaximumResourceCapability(this.conf.getMaximumAllocation());
+      this.calculator = this.conf.getResourceCalculator();
+      this.usePortForNodeName = this.conf.getUsePortForNodeName();
+      this.applications = new ConcurrentHashMap<>();
+      this.labelManager = rmContext.getNodeLabelManager();
+      authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+      this.activitiesManager = new ActivitiesManager(rmContext);
+      activitiesManager.init(conf);
+      initializeQueues(this.conf);
+      this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
+
+      scheduleAsynchronously = this.conf.getScheduleAynschronously();
+      asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+          DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+      if (scheduleAsynchronously) {
+        asyncSchedulerThread = new AsyncScheduleThread(this);
+      }
+
+      LOG.info("Initialized CapacityScheduler with " + "calculator="
+          + getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
+          + getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
+          + getMaximumResourceCapability() + ">, " + "asynchronousScheduling="
+          + scheduleAsynchronously + ", " + "asyncScheduleInterval="
+          + asyncScheduleInterval + "ms");
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void startSchedulerThreads() {
+    try {
+      writeLock.lock();
+      activitiesManager.start();
+      if (scheduleAsynchronously) {
+        Preconditions.checkNotNull(asyncSchedulerThread,
+            "asyncSchedulerThread is null");
+        asyncSchedulerThread.start();
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -353,40 +361,48 @@ public class CapacityScheduler extends
   @Override
   public void serviceStart() throws Exception {
     startSchedulerThreads();
-    activitiesManager.start();
     super.serviceStart();
   }
 
   @Override
   public void serviceStop() throws Exception {
-    synchronized (this) {
+    try {
+      writeLock.lock();
       if (scheduleAsynchronously && asyncSchedulerThread != null) {
         asyncSchedulerThread.interrupt();
         asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
+    } finally {
+      writeLock.unlock();
     }
+
     super.serviceStop();
   }
 
   @Override
-  public synchronized void
-  reinitialize(Configuration conf, RMContext rmContext) throws IOException {
-    Configuration configuration = new Configuration(conf);
-    CapacitySchedulerConfiguration oldConf = this.conf;
-    this.conf = loadCapacitySchedulerConfiguration(configuration);
-    validateConf(this.conf);
+  public void reinitialize(Configuration newConf, RMContext rmContext)
+      throws IOException {
     try {
-      LOG.info("Re-initializing queues...");
-      refreshMaximumAllocation(this.conf.getMaximumAllocation());
-      reinitializeQueues(this.conf);
-    } catch (Throwable t) {
-      this.conf = oldConf;
-      refreshMaximumAllocation(this.conf.getMaximumAllocation());
-      throw new IOException("Failed to re-init queues", t);
-    }
+      writeLock.lock();
+      Configuration configuration = new Configuration(newConf);
+      CapacitySchedulerConfiguration oldConf = this.conf;
+      this.conf = loadCapacitySchedulerConfiguration(configuration);
+      validateConf(this.conf);
+      try {
+        LOG.info("Re-initializing queues...");
+        refreshMaximumAllocation(this.conf.getMaximumAllocation());
+        reinitializeQueues(this.conf);
+      } catch (Throwable t) {
+        this.conf = oldConf;
+        refreshMaximumAllocation(this.conf.getMaximumAllocation());
+        throw new IOException("Failed to re-init queues", t);
+      }
 
-    // update lazy preemption
-    this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+      // update lazy preemption
+      this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   long getAsyncScheduleInterval() {
@@ -450,10 +466,6 @@ public class CapacityScheduler extends
     }
 
   }
-  
-  @Private
-  public static final String ROOT_QUEUE = 
-    CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT;
 
   static class QueueHook {
     public CSQueue hook(CSQueue queue) {
@@ -463,38 +475,41 @@ public class CapacityScheduler extends
   private static final QueueHook noop = new QueueHook();
 
   @VisibleForTesting
-  public synchronized UserGroupMappingPlacementRule
+  public UserGroupMappingPlacementRule
       getUserGroupMappingPlacementRule() throws IOException {
-    boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
-    LOG.info("Initialized queue mappings, override: "
-        + overrideWithQueueMappings);
-
-    // Get new user/group mappings
-    List<UserGroupMappingPlacementRule.QueueMapping> newMappings =
-        conf.getQueueMappings();
-    // check if mappings refer to valid queues
-    for (QueueMapping mapping : newMappings) {
-      String mappingQueue = mapping.getQueue();
-      if (!mappingQueue
-          .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
-          && !mappingQueue
-              .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
-        CSQueue queue = queues.get(mappingQueue);
-        if (queue == null || !(queue instanceof LeafQueue)) {
-          throw new IOException("mapping contains invalid or non-leaf queue "
-              + mappingQueue);
+    try {
+      readLock.lock();
+      boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+      LOG.info(
+          "Initialized queue mappings, override: " + overrideWithQueueMappings);
+
+      // Get new user/group mappings
+      List<QueueMapping> newMappings = conf.getQueueMappings();
+      // check if mappings refer to valid queues
+      for (QueueMapping mapping : newMappings) {
+        String mappingQueue = mapping.getQueue();
+        if (!mappingQueue.equals(
+            UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
+            .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
+          CSQueue queue = queues.get(mappingQueue);
+          if (queue == null || !(queue instanceof LeafQueue)) {
+            throw new IOException(
+                "mapping contains invalid or non-leaf queue " + mappingQueue);
+          }
         }
       }
-    }
 
-    // initialize groups if mappings are present
-    if (newMappings.size() > 0) {
-      Groups groups = new Groups(conf);
-      return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
-          newMappings, groups);
-    }
+      // initialize groups if mappings are present
+      if (newMappings.size() > 0) {
+        Groups groups = new Groups(conf);
+        return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+            newMappings, groups);
+      }
 
-    return null;
+      return null;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   private void updatePlacementRules() throws IOException {
@@ -527,12 +542,12 @@ public class CapacityScheduler extends
   }
 
   @Lock(CapacityScheduler.class)
-  private void reinitializeQueues(CapacitySchedulerConfiguration conf) 
+  private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
   throws IOException {
     // Parse new queues
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot = 
-        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
+        parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
             newQueues, queues, noop);
     
     // Ensure all existing queues are still present
@@ -694,248 +709,279 @@ public class CapacityScheduler extends
     return queues.get(queueName);
   }
 
-  private synchronized void addApplicationOnRecovery(
+  private void addApplicationOnRecovery(
       ApplicationId applicationId, String queueName, String user,
       Priority priority) {
-    CSQueue queue = getQueue(queueName);
-    if (queue == null) {
-      //During a restart, this indicates a queue was removed, which is
-      //not presently supported
-      if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMAppEvent(applicationId, RMAppEventType.KILL,
-            "Application killed on recovery as it was submitted to queue " +
-            queueName + " which no longer exists after restart."));
-        return;
-      } else {
-        String queueErrorMsg = "Queue named " + queueName
-            + " missing during application recovery."
-            + " Queue removal during recovery is not presently supported by the"
-            + " capacity scheduler, please restart with all queues configured"
-            + " which were present before shutdown/restart.";
-        LOG.fatal(queueErrorMsg);
-        throw new QueueInvalidException(queueErrorMsg);
+    try {
+      writeLock.lock();
+      CSQueue queue = getQueue(queueName);
+      if (queue == null) {
+        //During a restart, this indicates a queue was removed, which is
+        //not presently supported
+        if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.KILL,
+                  "Application killed on recovery as it was submitted to queue "
+                      + queueName + " which no longer exists after restart."));
+          return;
+        } else{
+          String queueErrorMsg = "Queue named " + queueName
+              + " missing during application recovery."
+              + " Queue removal during recovery is not presently "
+              + "supported by the capacity scheduler, please "
+              + "restart with all queues configured"
+              + " which were present before shutdown/restart.";
+          LOG.fatal(queueErrorMsg);
+          throw new QueueInvalidException(queueErrorMsg);
+        }
       }
-    }
-    if (!(queue instanceof LeafQueue)) {
-      // During RM restart, this means leaf queue was converted to a parent
-      // queue, which is not supported for running apps.
-      if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMAppEvent(applicationId, RMAppEventType.KILL,
-            "Application killed on recovery as it was submitted to queue " +
-            queueName + " which is no longer a leaf queue after restart."));
-        return;
-      } else {
-        String queueErrorMsg = "Queue named " + queueName
-            + " is no longer a leaf queue during application recovery."
-            + " Changing a leaf queue to a parent queue during recovery is"
-            + " not presently supported by the capacity scheduler. Please"
-            + " restart with leaf queues before shutdown/restart continuing"
-            + " as leaf queues.";
-        LOG.fatal(queueErrorMsg);
-        throw new QueueInvalidException(queueErrorMsg);
+      if (!(queue instanceof LeafQueue)) {
+        // During RM restart, this means leaf queue was converted to a parent
+        // queue, which is not supported for running apps.
+        if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.KILL,
+                  "Application killed on recovery as it was submitted to queue "
+                      + queueName
+                      + " which is no longer a leaf queue after restart."));
+          return;
+        } else{
+          String queueErrorMsg = "Queue named " + queueName
+              + " is no longer a leaf queue during application recovery."
+              + " Changing a leaf queue to a parent queue during recovery is"
+              + " not presently supported by the capacity scheduler. Please"
+              + " restart with leaf queues before shutdown/restart continuing"
+              + " as leaf queues.";
+          LOG.fatal(queueErrorMsg);
+          throw new QueueInvalidException(queueErrorMsg);
+        }
       }
-    }
-    // Submit to the queue
-    try {
-      queue.submitApplication(applicationId, user, queueName);
-    } catch (AccessControlException ace) {
-      // Ignore the exception for recovered app as the app was previously
-      // accepted.
-    }
-    queue.getMetrics().submitApp(user);
-    SchedulerApplication<FiCaSchedulerApp> application =
-        new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
-    applications.put(applicationId, application);
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      // Submit to the queue
+      try {
+        queue.submitApplication(applicationId, user, queueName);
+      } catch (AccessControlException ace) {
+        // Ignore the exception for recovered app as the app was previously
+        // accepted.
+      }
+      queue.getMetrics().submitApp(user);
+      SchedulerApplication<FiCaSchedulerApp> application =
+          new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
+      applications.put(applicationId, application);
+      LOG.info("Accepted application " + applicationId + " from user: " + user
+          + ", in queue: " + queueName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  private synchronized void addApplication(ApplicationId applicationId,
+  private void addApplication(ApplicationId applicationId,
       String queueName, String user, Priority priority) {
-    // Sanity checks.
-    CSQueue queue = getQueue(queueName);
-    if (queue == null) {
-      String message = "Application " + applicationId +
-      " submitted by user " + user + " to unknown queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, message));
-      return;
-    }
-    if (!(queue instanceof LeafQueue)) {
-      String message = "Application " + applicationId + 
-          " submitted by user " + user + " to non-leaf queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, message));
-      return;
-    }
-    // Submit to the queue
     try {
-      queue.submitApplication(applicationId, user, queueName);
-    } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application " + applicationId + " to queue "
-          + queueName + " from user " + user, ace);
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, ace.toString()));
-      return;
+      writeLock.lock();
+      // Sanity checks.
+      CSQueue queue = getQueue(queueName);
+      if (queue == null) {
+        String message =
+            "Application " + applicationId + " submitted by user " + user
+                + " to unknown queue: " + queueName;
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                message));
+        return;
+      }
+      if (!(queue instanceof LeafQueue)) {
+        String message =
+            "Application " + applicationId + " submitted by user " + user
+                + " to non-leaf queue: " + queueName;
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                message));
+        return;
+      }
+      // Submit to the queue
+      try {
+        queue.submitApplication(applicationId, user, queueName);
+      } catch (AccessControlException ace) {
+        LOG.info("Failed to submit application " + applicationId + " to queue "
+            + queueName + " from user " + user, ace);
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                ace.toString()));
+        return;
+      }
+      // update the metrics
+      queue.getMetrics().submitApp(user);
+      SchedulerApplication<FiCaSchedulerApp> application =
+          new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
+      applications.put(applicationId, application);
+      LOG.info("Accepted application " + applicationId + " from user: " + user
+          + ", in queue: " + queueName);
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    } finally {
+      writeLock.unlock();
     }
-    // update the metrics
-    queue.getMetrics().submitApp(user);
-    SchedulerApplication<FiCaSchedulerApp> application =
-        new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
-    applications.put(applicationId, application);
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
   }
 
-  private synchronized void addApplicationAttempt(
+  private void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
       boolean isAttemptRecovering) {
-    SchedulerApplication<FiCaSchedulerApp> application =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (application == null) {
-      LOG.warn("Application " + applicationAttemptId.getApplicationId() +
-          " cannot be found in scheduler.");
-      return;
-    }
-    CSQueue queue = (CSQueue) application.getQueue();
-
-    FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
-        application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
-            application.getPriority(), isAttemptRecovering, activitiesManager);
-    if (transferStateFromPreviousAttempt) {
-      attempt.transferStateFromPreviousAttempt(
-          application.getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(attempt);
-
-    // Update attempt priority to the latest to avoid race condition i.e
-    // SchedulerApplicationAttempt is created with old priority but it is not
-    // set to SchedulerApplication#setCurrentAppAttempt.
-    // Scenario would occur is
-    // 1. SchdulerApplicationAttempt is created with old priority.
-    // 2. updateApplicationPriority() updates SchedulerApplication. Since
-    // currentAttempt is null, it just return.
-    // 3. ScheduelerApplcationAttempt is set in
-    // SchedulerApplication#setCurrentAppAttempt.
-    attempt.setPriority(application.getPriority());
-
-    queue.submitApplicationAttempt(attempt, application.getUser());
-    LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user " + application.getUser() + " in queue "
-        + queue.getQueueName());
-    if (isAttemptRecovering) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(applicationAttemptId
-            + " is recovering. Skipping notifying ATTEMPT_ADDED");
+    try {
+      writeLock.lock();
+      SchedulerApplication<FiCaSchedulerApp> application = applications.get(
+          applicationAttemptId.getApplicationId());
+      if (application == null) {
+        LOG.warn("Application " + applicationAttemptId.getApplicationId()
+            + " cannot be found in scheduler.");
+        return;
       }
-    } else {
-      rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+      CSQueue queue = (CSQueue) application.getQueue();
+
+      FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
+          application.getUser(), queue, queue.getActiveUsersManager(),
+          rmContext, application.getPriority(), isAttemptRecovering,
+          activitiesManager);
+      if (transferStateFromPreviousAttempt) {
+        attempt.transferStateFromPreviousAttempt(
+            application.getCurrentAppAttempt());
+      }
+      application.setCurrentAppAttempt(attempt);
+
+      // Update attempt priority to the latest to avoid race condition i.e
+      // SchedulerApplicationAttempt is created with old priority but it is not
+      // set to SchedulerApplication#setCurrentAppAttempt.
+      // Scenario would occur is
+      // 1. SchdulerApplicationAttempt is created with old priority.
+      // 2. updateApplicationPriority() updates SchedulerApplication. Since
+      // currentAttempt is null, it just return.
+      // 3. ScheduelerApplcationAttempt is set in
+      // SchedulerApplication#setCurrentAppAttempt.
+      attempt.setPriority(application.getPriority());
+
+      queue.submitApplicationAttempt(attempt, application.getUser());
+      LOG.info("Added Application Attempt " + applicationAttemptId
+          + " to scheduler from user " + application.getUser() + " in queue "
+          + queue.getQueueName());
+      if (isAttemptRecovering) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(applicationAttemptId
+              + " is recovering. Skipping notifying ATTEMPT_ADDED");
+        }
+      } else{
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppAttemptEvent(applicationAttemptId,
+                RMAppAttemptEventType.ATTEMPT_ADDED));
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  private synchronized void doneApplication(ApplicationId applicationId,
+  private void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication<FiCaSchedulerApp> application =
-        applications.get(applicationId);
-    if (application == null){
-      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
-      // ignore it.
-      LOG.warn("Couldn't find application " + applicationId);
-      return;
-    }
-    CSQueue queue = (CSQueue) application.getQueue();
-    if (!(queue instanceof LeafQueue)) {
-      LOG.error("Cannot finish application " + "from non-leaf queue: "
-          + queue.getQueueName());
-    } else {
-      queue.finishApplication(applicationId, application.getUser());
+    try {
+      writeLock.lock();
+      SchedulerApplication<FiCaSchedulerApp> application = applications.get(
+          applicationId);
+      if (application == null) {
+        // The AppRemovedSchedulerEvent maybe sent on recovery for completed
+        // apps, ignore it.
+        LOG.warn("Couldn't find application " + applicationId);
+        return;
+      }
+      CSQueue queue = (CSQueue) application.getQueue();
+      if (!(queue instanceof LeafQueue)) {
+        LOG.error("Cannot finish application " + "from non-leaf queue: " + queue
+            .getQueueName());
+      } else{
+        queue.finishApplication(applicationId, application.getUser());
+      }
+      application.stop(finalState);
+      applications.remove(applicationId);
+    } finally {
+      writeLock.unlock();
     }
-    application.stop(finalState);
-    applications.remove(applicationId);
   }
 
-  private synchronized void doneApplicationAttempt(
+  private void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
-    LOG.info("Application Attempt " + applicationAttemptId + " is done." +
-        " finalState=" + rmAppAttemptFinalState);
-    
-    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication<FiCaSchedulerApp> application =
-        applications.get(applicationAttemptId.getApplicationId());
+    try {
+      writeLock.lock();
+      LOG.info("Application Attempt " + applicationAttemptId + " is done."
+          + " finalState=" + rmAppAttemptFinalState);
 
-    if (application == null || attempt == null) {
-      LOG.info("Unknown application " + applicationAttemptId + " has completed!");
-      return;
-    }
+      FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+      SchedulerApplication<FiCaSchedulerApp> application = applications.get(
+          applicationAttemptId.getApplicationId());
 
-    // Release all the allocated, acquired, running containers
-    for (RMContainer rmContainer : attempt.getLiveContainers()) {
-      if (keepContainers
-          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + rmContainer.getContainerId());
-        continue;
+      if (application == null || attempt == null) {
+        LOG.info(
+            "Unknown application " + applicationAttemptId + " has completed!");
+        return;
       }
-      super.completedContainer(
-        rmContainer,
-        SchedulerUtils.createAbnormalContainerStatus(
-          rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
-        RMContainerEventType.KILL);
-    }
 
-    // Release all reserved containers
-    for (RMContainer rmContainer : attempt.getReservedContainers()) {
-      super.completedContainer(
-        rmContainer,
-        SchedulerUtils.createAbnormalContainerStatus(
-          rmContainer.getContainerId(), "Application Complete"),
-        RMContainerEventType.KILL);
-    }
+      // Release all the allocated, acquired, running containers
+      for (RMContainer rmContainer : attempt.getLiveContainers()) {
+        if (keepContainers && rmContainer.getState().equals(
+            RMContainerState.RUNNING)) {
+          // do not kill the running container in the case of work-preserving AM
+          // restart.
+          LOG.info("Skip killing " + rmContainer.getContainerId());
+          continue;
+        }
+        super.completedContainer(rmContainer, SchedulerUtils
+                .createAbnormalContainerStatus(rmContainer.getContainerId(),
+                    SchedulerUtils.COMPLETED_APPLICATION),
+            RMContainerEventType.KILL);
+      }
 
-    // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
+      // Release all reserved containers
+      for (RMContainer rmContainer : attempt.getReservedContainers()) {
+        super.completedContainer(rmContainer, SchedulerUtils
+            .createAbnormalContainerStatus(rmContainer.getContainerId(),
+                "Application Complete"), RMContainerEventType.KILL);
+      }
 
-    // Inform the queue
-    String queueName = attempt.getQueue().getQueueName();
-    CSQueue queue = queues.get(queueName);
-    if (!(queue instanceof LeafQueue)) {
-      LOG.error("Cannot finish application " + "from non-leaf queue: "
-          + queueName);
-    } else {
-      queue.finishApplicationAttempt(attempt, queue.getQueueName());
+      // Clean up pending requests, metrics etc.
+      attempt.stop(rmAppAttemptFinalState);
+
+      // Inform the queue
+      String queueName = attempt.getQueue().getQueueName();
+      CSQueue queue = queues.get(queueName);
+      if (!(queue instanceof LeafQueue)) {
+        LOG.error(
+            "Cannot finish application " + "from non-leaf queue: " + queueName);
+      } else{
+        queue.finishApplicationAttempt(attempt, queue.getQueueName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  // It is crucial to acquire leaf queue lock first to prevent:
-  // 1. Race condition when calculating the delta resource in
-  //    SchedContainerChangeRequest
-  // 2. Deadlock with the scheduling thread.
   private LeafQueue updateIncreaseRequests(
-      List<UpdateContainerRequest> increaseRequests,
-      FiCaSchedulerApp app) {
+      List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
     if (null == increaseRequests || increaseRequests.isEmpty()) {
       return null;
     }
+
     // Pre-process increase requests
     List<SchedContainerChangeRequest> schedIncreaseRequests =
         createSchedContainerChangeRequests(increaseRequests, true);
     LeafQueue leafQueue = (LeafQueue) app.getQueue();
-    synchronized(leafQueue) {
+
+    try {
+      /*
+       * Acquire application's lock here to make sure application won't
+       * finish when updateIncreaseRequest is called.
+       */
+      app.getWriteLock().lock();
       // make sure we aren't stopping/removing the application
       // when the allocate comes in
       if (app.isStopped()) {
@@ -945,8 +991,12 @@ public class CapacityScheduler extends
       if (app.updateIncreaseRequests(schedIncreaseRequests)) {
         return leafQueue;
       }
-      return null;
+    } finally {
+      app.getWriteLock().unlock();
     }
+
+
+    return null;
   }
 
   @Override
@@ -956,7 +1006,6 @@ public class CapacityScheduler extends
       List<String> blacklistAdditions, List<String> blacklistRemovals,
       List<UpdateContainerRequest> increaseRequests,
       List<UpdateContainerRequest> decreaseRequests) {
-
     FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       return EMPTY_ALLOCATION;
@@ -966,42 +1015,43 @@ public class CapacityScheduler extends
     releaseContainers(release, application);
 
     // update increase requests
-    LeafQueue updateDemandForQueue =
-        updateIncreaseRequests(increaseRequests, application);
+    LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests,
+        application);
 
     // Decrease containers
     decreaseContainers(decreaseRequests, application);
 
     // Sanity check for new allocation requests
-    SchedulerUtils.normalizeRequests(
-        ask, getResourceCalculator(), getClusterResource(),
-        getMinimumResourceCapability(), getMaximumResourceCapability());
+    SchedulerUtils.normalizeRequests(ask, getResourceCalculator(),
+        getClusterResource(), getMinimumResourceCapability(),
+        getMaximumResourceCapability());
 
     Allocation allocation;
 
-    synchronized (application) {
-
-      // make sure we aren't stopping/removing the application
-      // when the allocate comes in
+    // make sure we aren't stopping/removing the application
+    // when the allocate comes in
+    try {
+      application.getWriteLock().lock();
       if (application.isStopped()) {
         return EMPTY_ALLOCATION;
       }
 
       // Process resource requests
       if (!ask.isEmpty()) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("allocate: pre-update " + applicationAttemptId +
-              " ask size =" + ask.size());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "allocate: pre-update " + applicationAttemptId + " ask size ="
+                  + ask.size());
           application.showRequests();
         }
 
         // Update application requests
-        if (application.updateResourceRequests(ask)
-            && (updateDemandForQueue == null)) {
+        if (application.updateResourceRequests(ask) && (updateDemandForQueue
+            == null)) {
           updateDemandForQueue = (LeafQueue) application.getQueue();
         }
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
           LOG.debug("allocate: post-update");
           application.showRequests();
         }
@@ -1011,6 +1061,8 @@ public class CapacityScheduler extends
 
       allocation = application.getAllocation(getResourceCalculator(),
           getClusterResource(), getMinimumResourceCapability());
+    } finally {
+      application.getWriteLock().unlock();
     }
 
     if (updateDemandForQueue != null && !application
@@ -1019,7 +1071,6 @@ public class CapacityScheduler extends
     }
 
     return allocation;
-
   }
 
   @Override
@@ -1049,142 +1100,159 @@ public class CapacityScheduler extends
     return root.getQueueUserAclInfo(user);
   }
 
-  private synchronized void nodeUpdate(RMNode nm) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm +
-          " clusterResources: " + getClusterResource());
-    }
+  private void nodeUpdate(RMNode nm) {
+    try {
+      writeLock.lock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "nodeUpdate: " + nm + " clusterResources: " + getClusterResource());
+      }
 
-    Resource releaseResources = Resource.newInstance(0, 0);
+      Resource releaseResources = Resource.newInstance(0, 0);
 
-    FiCaSchedulerNode node = getNode(nm.getNodeID());
-    
-    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
-    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
-    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
-    for(UpdatedContainerInfo containerInfo : containerInfoList) {
-      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
-      completedContainers.addAll(containerInfo.getCompletedContainers());
-    }
-    
-    // Processing the newly launched containers
-    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-    }
-    
-    // Processing the newly increased containers
-    List<Container> newlyIncreasedContainers =
-        nm.pullNewlyIncreasedContainers();
-    for (Container container : newlyIncreasedContainers) {
-      containerIncreasedOnNode(container.getId(), node, container);
-    }
-
-    // Process completed containers
-    int releasedContainers = 0;
-    for (ContainerStatus completedContainer : completedContainers) {
-      ContainerId containerId = completedContainer.getContainerId();
-      RMContainer container = getRMContainer(containerId);
-      super.completedContainer(container, completedContainer,
-        RMContainerEventType.FINISHED);
-      if (container != null) {
-        releasedContainers++;
-        Resource rs = container.getAllocatedResource();
-        if (rs != null) {
-          Resources.addTo(releaseResources, rs);
-        }
-        rs = container.getReservedResource();
-        if (rs != null) {
-          Resources.addTo(releaseResources, rs);
+      FiCaSchedulerNode node = getNode(nm.getNodeID());
+
+      List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+      List<ContainerStatus> newlyLaunchedContainers =
+          new ArrayList<ContainerStatus>();
+      List<ContainerStatus> completedContainers =
+          new ArrayList<ContainerStatus>();
+      for (UpdatedContainerInfo containerInfo : containerInfoList) {
+        newlyLaunchedContainers.addAll(
+            containerInfo.getNewlyLaunchedContainers());
+        completedContainers.addAll(containerInfo.getCompletedContainers());
+      }
+
+      // Processing the newly launched containers
+      for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+        containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+      }
+
+      // Processing the newly increased containers
+      List<Container> newlyIncreasedContainers =
+          nm.pullNewlyIncreasedContainers();
+      for (Container container : newlyIncreasedContainers) {
+        containerIncreasedOnNode(container.getId(), node, container);
+      }
+
+      // Process completed containers
+      int releasedContainers = 0;
+      for (ContainerStatus completedContainer : completedContainers) {
+        ContainerId containerId = completedContainer.getContainerId();
+        RMContainer container = getRMContainer(containerId);
+        super.completedContainer(container, completedContainer,
+            RMContainerEventType.FINISHED);
+        if (container != null) {
+          releasedContainers++;
+          Resource rs = container.getAllocatedResource();
+          if (rs != null) {
+            Resources.addTo(releaseResources, rs);
+          }
+          rs = container.getReservedResource();
+          if (rs != null) {
+            Resources.addTo(releaseResources, rs);
+          }
         }
       }
-    }
 
-    // If the node is decommissioning, send an update to have the total
-    // resource equal to the used resource, so no available resource to
-    // schedule.
-    // TODO: Fix possible race-condition when request comes in before
-    // update is propagated
-    if (nm.getState() == NodeState.DECOMMISSIONING) {
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(
-              new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                  .newInstance(getSchedulerNode(nm.getNodeID())
-                      .getAllocatedResource(), 0)));
-    }
-    schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
-      releaseResources);
-    schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
+      // If the node is decommissioning, send an update to have the total
+      // resource equal to the used resource, so no available resource to
+      // schedule.
+      // TODO: Fix possible race-condition when request comes in before
+      // update is propagated
+      if (nm.getState() == NodeState.DECOMMISSIONING) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+                .newInstance(
+                    getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
+                    0)));
+      }
+      schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
+          releaseResources);
+      schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
 
-    // Updating node resource utilization
-    node.setAggregatedContainersUtilization(
-        nm.getAggregatedContainersUtilization());
-    node.setNodeUtilization(nm.getNodeUtilization());
+      // Updating node resource utilization
+      node.setAggregatedContainersUtilization(
+          nm.getAggregatedContainersUtilization());
+      node.setNodeUtilization(nm.getNodeUtilization());
 
-    // Now node data structures are upto date and ready for scheduling.
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Node being looked for scheduling " + nm +
-          " availableResource: " + node.getUnallocatedResource());
+      // Now node data structures are upto date and ready for scheduling.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Node being looked for scheduling " + nm + " availableResource: "
+                + node.getUnallocatedResource());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
   /**
    * Process resource update on a node.
    */
-  private synchronized void updateNodeAndQueueResource(RMNode nm, 
+  private void updateNodeAndQueueResource(RMNode nm,
       ResourceOption resourceOption) {
-    updateNodeResource(nm, resourceOption);
-    Resource clusterResource = getClusterResource();
-    root.updateClusterResource(clusterResource, new ResourceLimits(
-        clusterResource));
+    try {
+      writeLock.lock();
+      updateNodeResource(nm, resourceOption);
+      Resource clusterResource = getClusterResource();
+      root.updateClusterResource(clusterResource,
+          new ResourceLimits(clusterResource));
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   /**
    * Process node labels update on a node.
    */
-  private synchronized void updateLabelsOnNode(NodeId nodeId,
+  private void updateLabelsOnNode(NodeId nodeId,
       Set<String> newLabels) {
-    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
-    if (null == node) {
-      return;
-    }
-    
-    // Get new partition, we have only one partition per node
-    String newPartition;
-    if (newLabels.isEmpty()) {
-      newPartition = RMNodeLabelsManager.NO_LABEL;
-    } else {
-      newPartition = newLabels.iterator().next();
-    }
+    try {
+      writeLock.lock();
+      FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+      if (null == node) {
+        return;
+      }
 
-    // old partition as well
-    String oldPartition = node.getPartition();
+      // Get new partition, we have only one partition per node
+      String newPartition;
+      if (newLabels.isEmpty()) {
+        newPartition = RMNodeLabelsManager.NO_LABEL;
+      } else{
+        newPartition = newLabels.iterator().next();
+      }
 
-    // Update resources of these containers
-    for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
-      FiCaSchedulerApp application =
-          getApplicationAttempt(rmContainer.getApplicationAttemptId());
-      if (null != application) {
-        application.nodePartitionUpdated(rmContainer, oldPartition,
-            newPartition);
-      } else {
-        LOG.warn("There's something wrong, some RMContainers running on"
-            + " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
-            + node.getNodeID() + " applicationAttemptId="
-            + rmContainer.getApplicationAttemptId());
-        continue;
+      // old partition as well
+      String oldPartition = node.getPartition();
+
+      // Update resources of these containers
+      for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
+        FiCaSchedulerApp application = getApplicationAttempt(
+            rmContainer.getApplicationAttemptId());
+        if (null != application) {
+          application.nodePartitionUpdated(rmContainer, oldPartition,
+              newPartition);
+        } else{
+          LOG.warn("There's something wrong, some RMContainers running on"
+              + " a node, but we cannot find SchedulerApplicationAttempt "
+              + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
+              + rmContainer.getApplicationAttemptId());
+          continue;
+        }
       }
+
+      // Unreserve container on this node
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (null != reservedContainer) {
+        killReservedContainer(reservedContainer);
+      }
+
+      // Update node labels after we've done this
+      node.updateLabels(newLabels);
+    } finally {
+      writeLock.unlock();
     }
-    
-    // Unreserve container on this node
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (null != reservedContainer) {
-      killReservedContainer(reservedContainer);
-    }
-    
-    // Update node labels after we've done this
-    node.updateLabels(newLabels);
   }
 
   private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
@@ -1219,134 +1287,134 @@ public class CapacityScheduler extends
  }
 
   @VisibleForTesting
-  public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
-    if (rmContext.isWorkPreservingRecoveryEnabled()
-        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
-      return;
-    }
+  public void allocateContainersToNode(FiCaSchedulerNode node) {
+    try {
+      writeLock.lock();
+      if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+          .isSchedulerReadyForAllocatingContainers()) {
+        return;
+      }
 
-    if (!nodeTracker.exists(node.getNodeID())) {
-      LOG.info("Skipping scheduling as the node " + node.getNodeID() +
-          " has been removed");
-      return;
-    }
+      if (!nodeTracker.exists(node.getNodeID())) {
+        LOG.info("Skipping scheduling as the node " + node.getNodeID()
+            + " has been removed");
+        return;
+      }
 
-    // reset allocation and reservation stats before we start doing any work
-    updateSchedulerHealth(lastNodeUpdateTime, node,
-      new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
+      // reset allocation and reservation stats before we start doing any work
+      updateSchedulerHealth(lastNodeUpdateTime, node,
+          new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
 
-    CSAssignment assignment;
+      CSAssignment assignment;
 
-    // Assign new containers...
-    // 1. Check for reserved applications
-    // 2. Schedule if there are no reservations
+      // Assign new containers...
+      // 1. Check for reserved applications
+      // 2. Schedule if there are no reservations
 
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (reservedContainer != null) {
 
-      FiCaSchedulerApp reservedApplication =
-          getCurrentAttemptForContainer(reservedContainer.getContainerId());
+        FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
+            reservedContainer.getContainerId());
 
-      // Try to fulfill the reservation
-      LOG.info("Trying to fulfill reservation for application "
-          + reservedApplication.getApplicationId() + " on node: "
-          + node.getNodeID());
+        // Try to fulfill the reservation
+        LOG.info("Trying to fulfill reservation for application "
+            + reservedApplication.getApplicationId() + " on node: " + node
+            .getNodeID());
 
-      LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
-      assignment =
-          queue.assignContainers(
-              getClusterResource(),
-              node,
-              // TODO, now we only consider limits for parent for non-labeled
-              // resources, should consider labeled resources as well.
-              new ResourceLimits(labelManager.getResourceByLabel(
-                  RMNodeLabelsManager.NO_LABEL, getClusterResource())),
-              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-      if (assignment.isFulfilledReservation()) {
-        CSAssignment tmp =
-            new CSAssignment(reservedContainer.getReservedResource(),
-                assignment.getType());
-        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-            reservedContainer.getReservedResource());
-        tmp.getAssignmentInformation().addAllocationDetails(
-            reservedContainer.getContainerId(), queue.getQueuePath());
-        tmp.getAssignmentInformation().incrAllocations();
-        updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
-        schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
-
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            queue.getParent().getQueueName(), queue.getQueueName(),
-            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
-            node, reservedContainer.getContainerId(),
-            AllocationState.ALLOCATED_FROM_RESERVED);
-      } else {
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            queue.getParent().getQueueName(), queue.getQueueName(),
-            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
-            node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
+        LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+        assignment = queue.assignContainers(getClusterResource(), node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager
+                .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+                    getClusterResource())),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        if (assignment.isFulfilledReservation()) {
+          CSAssignment tmp = new CSAssignment(
+              reservedContainer.getReservedResource(), assignment.getType());
+          Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+              reservedContainer.getReservedResource());
+          tmp.getAssignmentInformation().addAllocationDetails(
+              reservedContainer.getContainerId(), queue.getQueuePath());
+          tmp.getAssignmentInformation().incrAllocations();
+          updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
+          schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              queue.getParent().getQueueName(), queue.getQueueName(),
+              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+          ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+              node, reservedContainer.getContainerId(),
+              AllocationState.ALLOCATED_FROM_RESERVED);
+        } else{
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              queue.getParent().getQueueName(), queue.getQueueName(),
+              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+          ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+              node, reservedContainer.getContainerId(),
+              AllocationState.SKIPPED);
+        }
       }
-    }
 
-    // Try to schedule more if there are no reservations to fulfill
-    if (node.getReservedContainer() == null) {
-      if (calculator.computeAvailableContainers(Resources
-              .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
-          minimumAllocation) > 0) {
+      // Try to schedule more if there are no reservations to fulfill
+      if (node.getReservedContainer() == null) {
+        if (calculator.computeAvailableContainers(Resources
+            .add(node.getUnallocatedResource(),
+                node.getTotalKillableResources()), minimumAllocation) > 0) {
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to schedule on node: " + node.getNodeName() +
-              ", available: " + node.getUnallocatedResource());
-        }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Trying to schedule on node: " + node.getNodeName()
+                + ", available: " + node.getUnallocatedResource());
+          }
 
-        assignment = root.assignContainers(
-            getClusterResource(),
-            node,
-            new ResourceLimits(labelManager.getResourceByLabel(
-                node.getPartition(), getClusterResource())),
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-        if (Resources.greaterThan(calculator, getClusterResource(),
-            assignment.getResource(), Resources.none())) {
-          updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
-          return;
-        }
-        
-        // Only do non-exclusive allocation when node has node-labels.
-        if (StringUtils.equals(node.getPartition(),
-            RMNodeLabelsManager.NO_LABEL)) {
-          return;
-        }
-        
-        // Only do non-exclusive allocation when the node-label supports that
-        try {
-          if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
-              node.getPartition())) {
+          assignment = root.assignContainers(getClusterResource(), node,
+              new ResourceLimits(labelManager
+                  .getResourceByLabel(node.getPartition(),
+                      getClusterResource())),
+              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+          if (Resources.greaterThan(calculator, getClusterResource(),
+              assignment.getResource(), Resources.none())) {
+            updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
             return;
           }
-        } catch (IOException e) {
-          LOG.warn("Exception when trying to get exclusivity of node label="
-              + node.getPartition(), e);
-          return;
+
+          // Only do non-exclusive allocation when node has node-labels.
+          if (StringUtils.equals(node.getPartition(),
+              RMNodeLabelsManager.NO_LABEL)) {
+            return;
+          }
+
+          // Only do non-exclusive allocation when the node-label supports that
+          try {
+            if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+                node.getPartition())) {
+              return;
+            }
+          } catch (IOException e) {
+            LOG.warn(
+                "Exception when trying to get exclusivity of node label=" + node
+                    .getPartition(), e);
+            return;
+          }
+
+          // Try to use NON_EXCLUSIVE
+          assignment = root.assignContainers(getClusterResource(), node,
+              // TODO, now we only consider limits for parent for non-labeled
+              // resources, should consider labeled resources as well.
+              new ResourceLimits(labelManager
+                  .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+                      getClusterResource())),
+              SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
+          updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
         }
-        
-        // Try to use NON_EXCLUSIVE
-        assignment = root.assignContainers(
-            getClusterResource(),
-            node,
-            // TODO, now we only consider limits for parent for non-labeled
-            // resources, should consider labeled resources as well.
-            new ResourceLimits(labelManager.getResourceByLabel(
-                RMNodeLabelsManager.NO_LABEL, getClusterResource())),
-            SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
-        updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+      } else{
+        LOG.info("Skipping scheduling since node " + node.getNodeID()
+            + " is reserved by application " + node.getReservedContainer()
+            .getContainerId().getApplicationAttemptId());
       }
-    } else {
-      LOG.info("Skipping scheduling since node "
-          + node.getNodeID()
-          + " is reserved by application "
-          + node.getReservedContainer().getContainerId()
-              .getApplicationAttemptId());
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -1499,100 +1567,108 @@ public class CapacityScheduler extends
     }
   }
 
-  private synchronized void addNode(RMNode nodeManager) {
-    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName, nodeManager.getNodeLabels());
-    nodeTracker.addNode(schedulerNode);
-
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          schedulerNode.getTotalResource());
-    }
+  private void addNode(RMNode nodeManager) {
+    try {
+      writeLock.lock();
+      FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+          usePortForNodeName, nodeManager.getNodeLabels());
+      nodeTracker.addNode(schedulerNode);
+
+      // update this node to node label manager
+      if (labelManager != null) {
+        labelManager.activateNode(nodeManager.getNodeID(),
+            schedulerNode.getTotalResource());
+      }
 
-    Resource clusterResource = getClusterResource();
-    root.updateClusterResource(clusterResource, new ResourceLimits(
-        clusterResource));
+      Resource clusterResource = getClusterResource();
+      root.updateClusterResource(clusterResource,
+          new ResourceLimits(clusterResource));
 
-    LOG.info("Added node " + nodeManager.getNodeAddress() + 
-        " clusterResource: " + clusterResource);
+      LOG.info(
+          "Added node " + nodeManager.getNodeAddress() + " clusterResource: "
+              + clusterResource);
 
-    if (scheduleAsynchronously && getNumClusterNodes() == 1) {
-      asyncSchedulerThread.beginSchedule();
+      if (scheduleAsynchronously && getNumClusterNodes() == 1) {
+        asyncSchedulerThread.beginSchedule();
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  private synchronized void removeNode(RMNode nodeInfo) {
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.deactivateNode(nodeInfo.getNodeID());
-    }
+  private void removeNode(RMNode nodeInfo) {
+    try {
+      writeLock.lock();
+      // update this node to node label manager
+      if (labelManager != null) {
+        labelManager.deactivateNode(nodeInfo.getNodeID());
+      }
 
-    NodeId nodeId = nodeInfo.getNodeID();
-    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
-    if (node == null) {
-      LOG.error("Attempting to remove non-existent node " + nodeId);
-      return;
-    }
+      NodeId nodeId = nodeInfo.getNodeID();
+      FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+      if (node == null) {
+        LOG.error("Attempting to remove non-existent node " + nodeId);
+        return;
+      }
 
-    // Remove running containers
-    List<RMContainer> runningContainers = node.getCopiedListOfRunningContainers();
-    for (RMContainer container : runningContainers) {
-      super.completedContainer(container,
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getContainerId(), 
-              SchedulerUtils.LOST_CONTAINER), 
-          RMContainerEventType.KILL);
-    }
-    
-    // Remove reservations, if any
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      super.completedContainer(reservedContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              reservedContainer.getContainerId(), 
-              SchedulerUtils.LOST_CONTAINER), 
-          RMContainerEventType.KILL);
-    }
+      // Remove running containers
+      List<RMContainer> runningContainers =
+          node.getCopiedListOfRunningContainers();
+      for (RMContainer container : runningContainers) {
+        super.completedContainer(container, SchedulerUtils
+            .createAbnormalContainerStatus(container.getContainerId(),
+                SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+      }
 
-    nodeTracker.removeNode(nodeId);
-    Resource clusterResource = getClusterResource();
-    root.updateClusterResource(clusterResource, new ResourceLimits(
-        clusterResource));
-    int numNodes = nodeTracker.nodeCount();
+      // Remove reservations, if any
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (reservedContainer != null) {
+        super.completedContainer(reservedContainer, SchedulerUtils
+            .createAbnormalContainerStatus(reservedContainer.getContainerId(),
+                SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+      }
 
-    if (scheduleAsynchronously && numNodes == 0) {
-      asyncSchedulerThread.suspendSchedule();
-    }
+      nodeTracker.removeNode(nodeId);
+      Resource clusterResource = getClusterResource();
+      root.updateClusterResource(clusterResource,
+          new ResourceLimits(clusterResource));
+      int numNodes = nodeTracker.nodeCount();
+
+      if (scheduleAsynchronously && numNodes == 0) {
+        asyncSchedulerThread.suspendSchedule();
+      }
 
-    LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
-        " clusterResource: " + getClusterResource());
+      LOG.info(
+          "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: "
+              + getClusterResource());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void rollbackContainerResource(
       ContainerId containerId) {
     RMContainer rmContainer = getRMContainer(containerId);
     if (rmContainer == null) {
-      LOG.info("Cannot rollback resource for container " + containerId +
-          ". The container does not exist.");
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The container does not exist.");
       return;
     }
     FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Cannot rollback resource for container " + containerId +
-          ". The application that the container belongs to does not exist.");
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The application that the container "
+          + "belongs to does not exist.");
       return;
     }
     LOG.info("Roll back resource for container " + containerId);
-    LeafQueue leafQueue = (LeafQueue) application.getQueue();
-    synchronized(leafQueue) {
-      SchedulerNode schedulerNode =
-          getSchedulerNode(rmContainer.getAllocatedNode());
-      SchedContainerChangeRequest decreaseRequest =
-          new SchedContainerChangeRequest(this.rmContext, schedulerNode,
-              rmContainer, rmContainer.getLastConfirmedResource());
-      decreaseContainer(decreaseRequest, application);
-    }
+
+    SchedulerNode schedulerNode = getSchedulerNode(
+        rmContainer.getAllocatedNode());
+    SchedContainerChangeRequest decreaseRequest =
+        new SchedContainerChangeRequest(this.rmContext, schedulerNode,
+            rmContainer, rmContainer.getLastConfirmedResource());
+    decreaseContainer(decreaseRequest, application);
   }
 
   @Override
@@ -1601,23 +1677,29 @@ public class CapacityScheduler extends
       RMContainerEventType event) {
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
-    
+
     // Get the application for the finished container
-    FiCaSchedulerApp application =
-        getCurrentAttemptForContainer(container.getId());
+    FiCaSchedulerApp application = getCurrentAttemptForContainer(
+        container.getId());
     ApplicationId appId =
         containerId.getApplicationAttemptId().getApplicationId();
     if (application == null) {
-      LOG.info("Container " + container + " of" + " finished application "
-          + appId + " completed with event " + event);
+      LOG.info(
+          "Container " + container + " of" + " finished application " + appId
+              + " completed with event " + event);
       return;
     }
-    
+
     // Get the node on which the container was allocated
     FiCaSchedulerNode node = getNode(container.getNodeId());
-    
+    if (null == node) {
+      LOG.info("Container " + container + " of" + " removed node " + container
+          .getNodeId() + " completed with event " + event);
+      return;
+    }
+
     // Inform the queue
-    LeafQueue queue = (LeafQueue)application.getQueue();
+    LeafQueue queue = (LeafQueue) application.getQueue();
     queue.completedContainer(getClusterResource(), application, node,
         rmContainer, containerStatus, event, null, true);
   }
@@ -1628,19 +1710,19 @@ public class CapacityScheduler extends
     RMContainer rmContainer = decreaseRequest.getRMContainer();
     // Check container status before doing decrease
     if (rmContainer.getState() != RMContainerState.RUNNING) {
-      LOG.info("Trying to decrease a container not in RUNNING state, container="
-          + rmContainer + " state=" + rmContainer.getState().name());
+      LOG.info(
+          "Trying to decrease a container not in RUNNING state, container="
+              + rmContainer + " state=" + rmContainer.getState().name());
       return;
     }
-    FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
+    FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
     LeafQueue queue = (LeafQueue) attempt.getQueue();
     try {
       queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
       // Notify RMNode that the container can be pulled by NodeManager in the
       // next heartbeat
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMNodeDecreaseContainerEvent(
-              decreaseRequest.getNodeId(),
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
               Collections.singletonList(rmContainer.getContainer())));
     } catch (InvalidResourceRequestException e) {
       LOG.warn("Error happens when checking decrease request, Ignoring.."
@@ -1701,70 +1783,81 @@ public class CapacityScheduler extends
     }
   }
 
-  public synchronized void markContainerForKillable(
+  public void markContainerForKillable(
       RMContainer killableContainer) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
-          + killableContainer.toString());
+    try {
+      writeLock.lock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+            + killableContainer.toString());
+      }
+
+      if (!isLazyPreemptionEnabled) {
+        super.completedContainer(killableContainer, SchedulerUtils
+            .createPreemptedContainerStatus(killableContainer.getContainerId(),
+                SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
+      } else{
+        FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
+            killableContainer.getAllocatedNode());
+
+        FiCaSchedulerApp application = getCurrentAttemptForContainer(
+            killableContainer.getContainerId());
+
+        node.markContainerToKillable(killableContainer.getContainerId());
+
+        // notify PreemptionManager
+        // Get the application for the finished container
+        if (null != application) {
+          String leafQueueName = application.getCSLeafQueue().getQueueName();
+          getPreemptionManager().addKillableContainer(
+              new KillableContainer(killableContainer, node.getPartition(),
+                  leafQueueName));
+        }
+      }
+    } finally {
+      writeLock.unlock();
     }
+  }
+
+  private void markContainerForNonKillable(
+      RMContainer nonKillableContainer) {
+    try {
+      writeLock.lock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
+                + nonKillableContainer.toString());
+      }
 
-    if (!isLazyPreemptionEnabled) {
-      super.completedContainer(killableContainer, SchedulerUtils
-          .createPreemptedContainerStatus(killableContainer.getContainerId(),
-              SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
-    } else {
       FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
-          killableContainer.getAllocatedNode());
+          nonKillableContainer.getAllocatedNode());
 
       FiCaSchedulerApp application = getCurrentAttemptForContainer(
-          killableContainer.getContainerId());
+          nonKillableContainer.getContainerId());
 
-      node.markContainerToKillable(killableContainer.getContainerId());
+      node.markContainerToNonKillable(nonKillableContainer.getContainerId());
 
       // notify PreemptionManager
       // Get the application for the finished container
       if (null != application) {
         String leafQueueName = application.getCSLeafQueue().getQueueName();
-        getPreemptionManager().addKillableContainer(
-            new KillableContainer(killableContainer, node.getPartition(),
+        getPreemptionManager().removeKillableContainer(
+            new KillableContainer(nonKillableContainer, node.getPartition(),
                 leafQueueName));
-      }    }
-  }
-
-  private synchronized void markContainerForNonKillable(
-      RMContainer nonKillableContainer) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
-              + nonKillableContainer.toString());
-    }
-
-    FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
-        nonKillableContainer.getAllocatedNode());
-
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(
-        nonKillableContainer.getContainerId());
-
-    node.markContainerToNonKillable(nonKillableContainer.getContainerId());
-
-    // notify PreemptionManager
-    // Get the application for the finished container
-    if (null != application) {
-      String leafQueueName = application.getCSLeafQueue().getQueueName();
-      getPreemptionManager().removeKillableContainer(
-          new KillableContainer(nonKillableContainer, node.getPartition(),
-              leafQueueName));
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
   @Override
-  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+  public boolean checkAccess(UserGroupInformation callerUGI,
       QueueACL acl, String queueName) {
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("ACL not found for queue access-type " + acl
-            + " for queue " + queueName);
+        LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+            + queueName);
       }
       return false;
     }
@@ -1803,179 +1896,213 @@ public class CapacityScheduler extends
     return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
   }
 
-  private synchronized String resolveReservationQueueName(String queueName,
+  private String resolveReservationQueueName(String queueName,
       ApplicationId applicationId, ReservationId reservationID,
       boolean isRecovering) {
-    CSQueue queue = getQueue(queueName);
-    // Check if the queue is a plan queue
-    if ((queue == null) || !(queue instanceof PlanQueue)) {
-      return queueName;
-    }
-    if (reservationID != null) {
-      String resQName = reservationID.toString();
-      queue = getQueue(resQName);
-      if (queue == null) {
-        // reservation has terminated during failover
-        if (isRecovering
-            && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
-          // move to the default child queue of the plan
-          return getDefaultReservationQueueName(queueName);
-        }
-        String message =
-            "Application " + applicationId
-                + " submitted to a reservation which is not currently active: "
-                + resQName;
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMAppEvent(applicationId,
-                RMAppEventType.APP_REJECTED, message));
-        return null;
+    try {
+      readLock.lock();
+      CSQueue queue = getQueue(queueName);
+      // Check if the queue is a plan queue
+      if ((queue == null) || !(queue instanceof PlanQueue)) {
+        return queueName;
       }
-      if (!queue.getParent().getQueueName().equals(queueName)) {
-        String message =
-            "Application: " + applicationId + " submitted to a reservation "
-                + resQName + " which does not belong to the specified queue: "
-                + queueName;
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMAppEvent(applicationId,
-                RMAppEventType.APP_REJECTED, message));
-        return null;
+      if (reservationID != null) {
+        String resQName = reservationID.toString();
+        queue = getQueue(resQName);
+        if (queue == null) {
+          // reservation has terminated during failover
+          if (isRecovering && conf.getMoveOnExpiry(
+              getQueue(queueName).getQueuePath())) {
+            // move to the default child queue of the plan
+            return getDefaultReservationQueueName(queueName);
+          }
+          String message = "Application " + applicationId
+              + " submitted to a reservation which is not currently active: "
+              + resQName;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return null;
+        }
+        if (!queue.getParent().getQueueName().equals(queueName)) {
+          String message =
+              "Application: " + applicationId + " submitted to a reservation "
+                  + resQName + " which does not belong to the specified queue: "
+                  + queueName;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return null;
+        }
+        // use the reservation queue to run the app
+        queueName = resQName;
+      } else{
+        // use the default child queue of the plan for unreserved apps
+        queueName = getDefaultReservationQueueName(queueName);
       }
-      // use the reservation queue to run the app
-      queueName = resQName;
-    } else {
-      // use the default child queue of the plan for unreserved apps
-      queueName = getDefaultReservationQueueName(queueName);
+      return queueName;
+    } finally {
+      readLock.unlock();
     }
-    return queueName;
+
   }
 
   @Override
-  public synchronized void removeQueue(String queueName)
+  public void removeQueue(String queueName)
       throws SchedulerDynamicEditException {
-    LOG.info("Removing queue: " + queueName);
-    CSQueue q = this.getQueue(queueName);
-    if (!(q instanceof ReservationQueue)) {
-      throw new SchedulerDynamicEditException("The queue that we are asked "
-          + "to remove (" + queueName + ") is not a ReservationQueue");
-    }
-    ReservationQueue disposableLeafQueue = (ReservationQueue) q;
-    // at this point we should have no more apps
-    if (disposableLeafQueue.getNumApplications() > 0) {
-      throw new SchedulerDynamicEditException("The queue " + queueName
-          + " is not empty " + disposableLeafQueue.getApplications().size()
-          + " active apps " + disposableLeafQueue.getPendingApplications().size()
-          + " pending apps");
-    }
+    try {
+      writeLock.lock();
+      LOG.info("Removing queue: " + queueName);
+      CSQueue q = this.getQueue(queueName);
+      if (!(q instanceof ReservationQueue)) {
+        throw new SchedulerDynamicEditException(
+            "The queue that we are asked " + "to remove (" + queueName
+                + ") is not a ReservationQueue");
+      }
+      ReservationQueue disposableLeafQueue = (ReservationQueue) q;
+      // at this point we should have no more apps
+      if (disposableLeafQueue.getNumApplications() > 0) {
+        throw new SchedulerDynamicEditException(
+            "The queue " + queueName + " is not empty " + disposableLeafQueue
+                .getApplications().size() + " active apps "
+                + disposableLeafQueue.getPendingApplications().size()
+                + " pending apps");
+      }
 
-    ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
-    this.queues.remove(queueName);
-    LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+      ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
+      this.queues.remove(queueName);
+      LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
-  public synchronized void addQueue(Queue queue)
+  public void addQueue(Queue queue)
       throws SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+      if (!(queue instanceof ReservationQueue)) {
+        throw new SchedulerDynamicEditException(
+            "Queue " + queue.getQueueName() + " is not a ReservationQueue");
+      }
 
-    if (!(queue instanceof ReservationQueue)) {
-      throw new SchedulerDynamicEditException("Queue " + queue.getQueueName()
-          + " is not a ReservationQueue");
-    }
+      ReservationQueue newQueue = (ReservationQueue) queue;
 
-    ReservationQueue newQueue = (ReservationQueue) queue;
+      if (newQueue.getParent() == null || !(newQueue
+          .getParent() instanceof PlanQueue)) {
+        throw new SchedulerDynamicEditException(
+            "ParentQueue for " + newQueue.getQueueName()
+                + " is not properly set (should be set and be a PlanQueue)");
+      }
 
-    if (newQueue.getParent() == null
-        || !(newQueue.getParent() instanceof PlanQueue)) {
-      throw new SchedulerDynamicEditException("ParentQueue for "
-          + newQueue.getQueueName()
-          + " is not properly set (should be set and be a PlanQueue)");
+      PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
+      String queuename = newQueue.getQueueName();
+      parentPlan.addChildQueue(newQueue);
+      this.queues.put(queuename, newQueue);
+      LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
+    } finally {
+      writeLock.unlock();
     }
-
-    PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
-    String queuename = newQueue.getQueueName();
-    parentPlan.addChildQueue(newQueue);
-    this.queues.put(queuename, newQueue);
-    LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
   }
 
   @Override
-  public synchronized void setEntitlement(String inQueue,
-      QueueEntitlement entitlement) throws SchedulerDynamicEditException,
-      YarnException {
-    LeafQueue queue = getAndCheckLeafQueue(inQueue);
-    ParentQueue parent = (ParentQueue) queue.getParent();
-
-    if (!(queue instanceof ReservationQueue)) {
-      throw new SchedulerDynamicEditException("Entitlement can not be"
-          + " modified dynamically since queue " + inQueue
-          + " is not a ReservationQueue");
-    }
+  public void setEntitlement(String inQueue, QueueEntitlement entitlement)
+      throws YarnException {
+    try {
+      writeLock.lock();
+      LeafQueue queue = getAndCheckLeafQueue(inQueue);
+      ParentQueue parent = (ParentQueue) queue.getParent();
+
+      if (!(queue instanceof ReservationQueue)) {
+        throw new SchedulerDynamicEditException(
+            "Entitlement can not be" + " modified dynamically since queue "
+                + inQueue + " is not a ReservationQueue");
+      }
 
-    if (!(parent instanceof PlanQueue)) {
-      throw new SchedulerDynamicEditException("The parent of ReservationQueue "
-          + inQueue + " must be an PlanQueue");
-    }
+      if (!(parent instanceof PlanQueue)) {
+        throw new SchedulerDynamicEditException(
+            "The parent of ReservationQueue " + inQueue
+                + " must be an PlanQueue");
+      }
 
-    ReservationQueue newQueue = (ReservationQueue) queue;
+      ReservationQueue newQueue = (ReservationQueue) queue;
 
-    float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
-    float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity();
+      float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
+      float newChildCap =
+          sumChilds - queue.getCapacity() + entitlement.getCapacity();
 
-    if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
-      // note: epsilon checks here are not ok, as the epsilons might accumulate
-      // and become a problem in aggregate
-      if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
-          && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
-        return;
+      if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
+        // note: epsilon checks here are not ok, as the epsilons might
+        // accumulate and become a problem in aggregate
+        if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
+            && Math.abs(
+            entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
+          return;
+        }
+        newQueue.setEntitlement(entitlement);
+      } else{
+        throw new SchedulerDynamicEditException(
+            "Sum of child queues would exceed 100% for PlanQueue: " + parent
+                .getQueueName());
       }
-      newQueue.setEntitlement(entitlement);
-    } else {
-      throw new SchedulerDynamicEditException(
-          "Sum of child queues would exceed 100% for PlanQueue: "
-              + parent.getQueueName());
+      LOG.info(
+          "Set entitlement for ReservationQueue " + inQueue + "  to " + queue
+              .getCapacity() + " request was (" + entitlement.getCapacity()
+              + ")");
+    } finally {
+      writeLock.unlock();
     }
-    LOG.info("Set entitlement for ReservationQueue " + inQueue + "  to "
-        + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")");
   }
 
   @Override
-  public synchronized String moveApplication(ApplicationId appId,
+  public String moveApplication(ApplicationId appId,
       String targetQueueName) throws YarnException {
-    FiCaSchedulerApp app =
-        getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
-    String sourceQueueName = app.getQueue().getQueueName();
-    LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
-    String destQueueName = handleMoveToPlanQueue(targetQueueName);
-    LeafQueue dest = getAndCheckLeafQueue(destQueueName);
-    // Validation check - ACLs, submission limits for user & queue
-    String user = app.getUser();
-    checkQueuePartition(app, dest);
     try {
-      dest.submitApplication(appId, user, destQueueName);
-    } catch (AccessControlException e) {
-      throw new YarnException(e);
-    }
-    // Move all live containers
-    for (RMContainer rmContainer : app.getLiveContainers()) {
-      source.detachContainer(getClusterResource(), app, rmContainer);
-      // attach the Container to another queue
-      dest.attachContainer(getClusterResource(), app, rmContainer);
-    }
-    // Detach the application..
-    source.finishApplicationAttempt(app, sourceQueueName);
-    source.getParent().finishApplication(appId, app.getUser());
-    // Finish app & update metrics
-    app.move(dest);
-    // Submit to a new queue
-    dest.submitApplicationAttempt(app, user);
-    applications.get(appId).setQueue(dest);
-    LOG.info("App: " + app.getApplicationId() + " successfully moved from "
-        + sourceQueueName + " to: " + destQueueName);
-    return targetQueueName;
+      writeLock.lock();
+      FiCaSchedulerApp app = getApplicationAttempt(
+          ApplicationAttemptId.newInstance(appId, 0));
+      String sourceQueueName = app.getQueue().getQueueName();
+      LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+      String destQueueName = handleMoveToPlanQueue(targetQueueName);
+      LeafQueue dest = getAndCheckLeafQueue(destQueueName);
+      // Validation check - ACLs, submission limits for user & queue
+      String user = app.getUser();
+      checkQueuePartition(app, dest);
+      try {
+        dest.submitApplication(appId, user, destQueueName);
+      } catch (AccessControlException e) {
+        throw new YarnException(e);
+      }
+      // Move all live containers
+      for (RMContainer rmContainer : app.getLiveContainers()) {
+        source.detachContainer(getClusterResource(), app, rmContainer);
+        // attach the Container to another queue
+        dest.attachContainer(getClusterResource(), app, rmContainer);
+      }
+      // Detach the application..
+      source.finishApplicationAttempt(app, sourceQueueName);
+      source.getParent().finishApplication(appId, app.getUser());
+      // Finish app & update metrics
+      app.move(dest);
+      // Submit to a new queue
+      dest.submitApplicationAttempt(app, user);
+      applications.get(appId).setQueue(dest);
+      LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+          + sourceQueueName + " to: " + destQueueName);
+      return targetQueueName;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /*
    * Check application can be moved to queue with labels enabled. All labels in
    * application life time will be checked
+   *
+   * @param app
+   * @param dest
+   * @throws YarnException
    */
   private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest)
       throws YarnException {
@@ -2163,16 +2290,8 @@ public class CapacityScheduler extends
     // As we use iterator over a TreeSet for OrderingPolicy, once we change
     // priority then reinsert back to make order correct.
     LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
-    synchronized (queue) {
-      queue.getOrderingPolicy().removeSchedulableEntity(
-          application.getCurrentAppAttempt());
 
-      // Update new priority in SchedulerApplication
-      application.setPriority(appPriority);
-
-      queu

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message