myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dar...@apache.org
Subject incubator-myriad git commit: Continuation of Myriad 171, started to work towards public methods incrementResources and decrementResources, as it's easier to reason about purely additve functions in multithreaded environments. Fixed minor bugs in previou
Date Thu, 12 May 2016 04:24:48 GMT
Repository: incubator-myriad
Updated Branches:
  refs/heads/master fe493af32 -> 12a679c2b


Continuation of Myriad 171, started to work towards public methods incrementResources and
decrementResources, as it's easier to reason about purely additve functions in multithreaded
environments.  Fixed minor bugs in previous Myriad-171 patch, placed the guard for Node Managers
having negative resources in setNodeCapacity and reverted back any from calling yarnScheduler.updateNode
directly.  Very well tested.

Todo: Figure out how to make setNodeCapacity private.
JIRA:
  [Myriad-171] https://issues.apache.org/jira/browse/MYRIAD-171
Pull Request:
  Closes #70
Author:
  DarinJ <darinj@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/12a679c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/12a679c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/12a679c2

Branch: refs/heads/master
Commit: 12a679c2b05e550e6edabe2a0033aaf206786e6f
Parents: fe493af
Author: darinj <darinj.work@gmail.com>
Authored: Tue May 10 16:47:02 2016 -0400
Committer: darinj <darinj@apache.org>
Committed: Thu May 12 00:23:27 2016 -0400

----------------------------------------------------------------------
 .../scheduler/fgs/YarnNodeCapacityManager.java  | 60 ++++++++++++++------
 .../fgs/YarnNodeCapacityManagerSpec.groovy      |  2 +-
 2 files changed, 43 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 1dee5fa..e922fc6 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -24,6 +24,8 @@ import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.inject.Inject;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -77,6 +79,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
+  private static final Lock yarnSchedulerLock = new ReentrantLock();
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
   private TaskUtils taskUtils;
 
@@ -123,7 +126,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
     }
   }
 
-  private synchronized void removeYarnTask(RMContainer rmContainer) {
+  private void removeYarnTask(RMContainer rmContainer) {
     if (rmContainer != null && rmContainer.getContainer() != null) {
       Protos.TaskID taskId = containerToTaskId(rmContainer);
       //TODO (darinj) Reliable messaging
@@ -134,8 +137,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
       if (node != null) {
         RMNode rmNode = node.getNode().getRMNode();
         Resource resource = rmContainer.getContainer().getResource();
-        Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, Resources.subtract(rmNode.getTotalCapability(),
resource));
-        setNodeCapacity(rmNode, diff);
+        decrementNodeCapacity(rmNode, resource);
         LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(),
             rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory());
       } else {
@@ -206,8 +208,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
       for (Protos.Offer offer : consumedOffer.getOffers()) {
         offerLifecycleMgr.declineOffer(offer);
       }
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(
-          consumedOffer.getOffers())));
+      decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
     } else {
       LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host,
containersAllocatedByMesosOffer.size());
 
@@ -223,8 +224,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
       // Reduce node capacity to account for unused offers
       Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
       Resource resUnused = Resources.subtract(resOffered, resUsed);
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused));
-
+      decrementNodeCapacity(rmNode, resUnused);
       myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
     }
 
@@ -232,6 +232,15 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
     node.removeContainerSnapshot();
   }
 
+
+  public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
+    setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity));
+  }
+
+  public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
+    setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity));
+  }
+
   /**
    * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
    * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler.
@@ -243,19 +252,34 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
   @SuppressWarnings("unchecked")
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     //NOOP prevent YARN warning changing to same size
-    if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) {
-      rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
-      rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-      LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
-      // updates the scheduler with the new capacity for the NM.
-      synchronized (yarnScheduler) {
-        if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) {
-          yarnScheduler.updateNodeResource(rmNode,
-              ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
-        } else {
-          LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID());
+    if ((Resources.equals(rmNode.getTotalCapability(), newCapacity))) {
+      return;
+    }
+    if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) == null) {
+      LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID());
+      return;
+    }
+    yarnSchedulerLock.lock();
+    try {
+      if (newCapacity.getMemory() < 0 || newCapacity.getVirtualCores() < 0) {
+        Resource zeroed = ResourceUtils.componentwiseMax(ZERO_RESOURCE, newCapacity);
+        rmNode.getTotalCapability().setMemory(zeroed.getMemory());
+        rmNode.getTotalCapability().setVirtualCores(zeroed.getVirtualCores());
+        LOGGER.warn("Asked to set Node {} to a value less than zero!  Had {}, setting to
{}.",
+            rmNode.getHttpAddress(), rmNode.getTotalCapability().toString(), zeroed.toString());
+      } else {
+        rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
+        rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
         }
       }
+      // updates the scheduler with the new capacity for the NM.
+      // the event is handled by the scheduler asynchronously
+      rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode,
ResourceOption.newInstance(
+          rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    } finally {
+      yarnSchedulerLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
index 5d59c68..f7d8c43 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -117,7 +117,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
         then:
         zeroNM.getTotalCapability().getMemory() == 2048
         zeroNM.getTotalCapability().getVirtualCores() == 2
-        1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption)
+        1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent)
     }
 
     YarnNodeCapacityManager getYarnNodeCapacityManager() {


Mime
View raw message