myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject incubator-myriad git commit: Unit tests for Fine Grained Scaling
Date Sat, 17 Oct 2015 00:08:29 GMT
Repository: incubator-myriad
Updated Branches:
  refs/heads/master d2eaa4d16 -> 645d17d8a


Unit tests for Fine Grained Scaling

Review: https://github.com/mesos/myriad/pull/125
Address @kensipe's comments, except for
using composition over inheritance for the tests' base class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/645d17d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/645d17d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/645d17d8

Branch: refs/heads/master
Commit: 645d17d8ad9263e59c18dea7110381b84643bc56
Parents: d2eaa4d
Author: Santosh Marella <marella@gmail.com>
Authored: Fri Oct 16 17:01:28 2015 -0700
Committer: Santosh Marella <marella@gmail.com>
Committed: Fri Oct 16 17:01:28 2015 -0700

----------------------------------------------------------------------
 .../scheduler/fgs/NMHeartBeatHandler.java       |  32 ++--
 .../scheduler/fgs/YarnNodeCapacityManager.java  |  11 +-
 .../myriad/scheduler/SchedulerUtilsSpec.groovy  |  25 +++
 .../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 152 +++++++++++++++++++
 .../scheduler/fgs/NMHeartBeatHandlerSpec.groovy |  96 ++++++++++++
 .../fgs/YarnNodeCapacityManagerSpec.groovy      | 119 +++++++++++++++
 6 files changed, 410 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
index bd125ce..44c4c39 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -20,10 +20,10 @@ package com.ebay.myriad.scheduler.fgs;
 
 import com.ebay.myriad.scheduler.MyriadDriver;
 import com.ebay.myriad.scheduler.SchedulerUtils;
-import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.state.SchedulerState;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import javax.inject.Inject;
@@ -47,8 +47,8 @@ import org.slf4j.LoggerFactory;
  * Handles node manager heartbeat.
  */
 public class NMHeartBeatHandler extends BaseInterceptor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(
-      NMHeartBeatHandler.class);
+  @VisibleForTesting
+  Logger logger = LoggerFactory.getLogger(NMHeartBeatHandler.class);
 
   private final AbstractYarnScheduler yarnScheduler;
   private final MyriadDriver myriadDriver;
@@ -97,10 +97,10 @@ public class NMHeartBeatHandler extends BaseInterceptor {
         Resource totalCapability = rmNode.getTotalCapability();
         if (totalCapability.getMemory() != 0 ||
             totalCapability.getVirtualCores() != 0) {
-          LOGGER.warn("FineGrainedScaling feature got invoked for a " +
-                  "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's
capacity to (0G,0CPU)",
-              rmNode.getHostName(),
-              totalCapability.getMemory(), totalCapability.getVirtualCores());
+          logger.warn("FineGrainedScaling feature got invoked for a " +
+              "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity
to (0G,0CPU)",
+            rmNode.getHostName(),
+            totalCapability.getMemory(), totalCapability.getVirtualCores());
           totalCapability.setMemory(0);
           totalCapability.setVirtualCores(0);
         }
@@ -117,10 +117,11 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     }
   }
 
-  private void handleStatusUpdate(RMNodeEvent event, RMContext context) {
+  @VisibleForTesting
+  protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
     if (!(event instanceof RMNodeStatusEvent)) {
-      LOGGER.error("{} not an instance of {}", event.getClass().getName(),
-          RMNodeStatusEvent.class.getName());
+      logger.error("{} not an instance of {}", event.getClass().getName(),
+        RMNodeStatusEvent.class.getName());
       return;
     }
 
@@ -144,7 +145,7 @@ public class NMHeartBeatHandler extends BaseInterceptor {
   private Resource getNewResourcesOfferedByMesos(String hostname) {
     OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
     if (feed == null) {
-      LOGGER.debug("No offer feed for: {}", hostname);
+      logger.debug("No offer feed for: {}", hostname);
       return Resource.newInstance(0, 0);
     }
     List<Offer> offers = new ArrayList<>();
@@ -155,8 +156,8 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     }
     Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers);
 
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("NM on host {} got {} CPUs and {} memory from mesos",
+    if (logger.isDebugEnabled()) {
+      logger.debug("NM on host {} got {} CPUs and {} memory from mesos",
           hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory());
     }
 
@@ -177,9 +178,4 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     }
     return usedResources;
   }
-
-  private Protos.ExecutorID getExecutorId(Protos.SlaveID slaveId) {
-    return Protos.ExecutorID.newBuilder().setValue(
-        TaskFactory.NMTaskFactoryImpl.EXECUTOR_PREFIX + slaveId.getValue()).build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index a987885..ae28bda 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -21,12 +21,11 @@ package com.ebay.myriad.scheduler.fgs;
 import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.executor.ContainerTaskStatusRequest;
 import com.ebay.myriad.scheduler.MyriadDriver;
-import com.ebay.myriad.scheduler.NMTaskFactoryAnnotation;
 import com.ebay.myriad.scheduler.SchedulerUtils;
-import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.state.SchedulerState;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -74,15 +73,13 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
     private final MyriadDriver myriadDriver;
     private final OfferLifecycleManager offerLifecycleMgr;
     private final NodeStore nodeStore;
-    private final TaskFactory taskFactory;
-    private final SchedulerState state;
+  private final SchedulerState state;
 
     @Inject
     public YarnNodeCapacityManager(InterceptorRegistry registry,
                                    AbstractYarnScheduler yarnScheduler,
                                    RMContext rmContext,
                                    MyriadDriver myriadDriver,
-                                   @NMTaskFactoryAnnotation TaskFactory taskFactory,
                                    OfferLifecycleManager offerLifecycleMgr,
                                    NodeStore nodeStore,
                                    SchedulerState state) {
@@ -92,7 +89,6 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
         this.yarnScheduler = yarnScheduler;
         this.rmContext = rmContext;
         this.myriadDriver = myriadDriver;
-        this.taskFactory = taskFactory;
         this.offerLifecycleMgr = offerLifecycleMgr;
         this.nodeStore = nodeStore;
         this.state = state;
@@ -152,7 +148,8 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
      * capacity depending on what portion of the consumed offers were actually
      * used.
      */
-    private void handleContainerAllocation(RMNode rmNode) {
+    @VisibleForTesting
+    protected void handleContainerAllocation(RMNode rmNode) {
       String host = rmNode.getNodeID().getHost();
 
       ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
index efd91ff..e936207 100644
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
@@ -1,6 +1,8 @@
 package com.ebay.myriad.scheduler
 
+import com.ebay.myriad.configuration.NodeManagerConfiguration
 import com.ebay.myriad.state.NodeTask
+import com.ebay.myriad.state.SchedulerState
 import org.apache.mesos.Protos
 import spock.lang.Specification
 
@@ -29,6 +31,29 @@ class SchedulerUtilsSpec extends Specification {
 
     }
 
+    def "is eligible for Fine Grained Scaling"() {
+        given:
+        def state = Mock(SchedulerState)
+        def tasks = []
+        def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0,
0), 1.0, 2.0), null)
+        def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2,
4096), 1.0, 2.0), null)
+        fgsNMTask.setHostname("test_fgs_hostname")
+        cgsNMTask.setHostname("test_cgs_hostname")
+        tasks << fgsNMTask << cgsNMTask
+        state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX) >> tasks
+
+        expect:
+        returnValue == SchedulerUtils.isEligibleForFineGrainedScaling(hostName, state)
+
+        where:
+        hostName            | returnValue
+        "test_fgs_hostname" | true
+        "test_cgs_hostname" | false
+        "blah"              | false
+        ""                  | false
+        null                | false
+    }
+
     ArrayList<NodeTask> createNodeTaskList(String... hostnames) {
         def list = []
         hostnames.each { hostname ->

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
new file mode 100644
index 0000000..bee223d
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
@@ -0,0 +1,152 @@
+package com.ebay.myriad.scheduler.fgs
+
+import com.ebay.myriad.configuration.MyriadConfiguration
+import com.ebay.myriad.scheduler.MyriadDriver
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.event.Dispatcher
+import org.apache.hadoop.yarn.event.EventHandler
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode
+import org.apache.hadoop.yarn.util.resource.Resources
+import org.apache.mesos.Protos
+import org.apache.mesos.SchedulerDriver
+import spock.lang.Specification
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ *
+ * Base class for testing Fine Grained Scaling.
+ *
+ */
+class FGSTestBaseSpec extends Specification {
+  def nodeStore = new NodeStore()
+  def mesosDriver = Mock(SchedulerDriver)
+  def myriadDriver = new MyriadDriver(mesosDriver)
+  def offerLifecycleManager = new OfferLifecycleManager(nodeStore, myriadDriver)
+
+  def cfg = new MyriadConfiguration()
+
+  void setup() {
+    ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
+    cfg = mapper.readValue(
+        Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"),
+        MyriadConfiguration.class)
+  }
+/******************* Nodes Related ****************/
+
+  def rmNodes = new ConcurrentHashMap<NodeId, RMNode>()
+
+  RMNode getRMNode(int cpu, int mem, String host, Protos.SlaveID slaveId) {
+    RMNode rmNode = MockNodes.newNodeInfo(0, Resources.createResource(mem, cpu), 0, host)
+    if (rmNodes[rmNode.getNodeID()]) {
+      throw new IllegalArgumentException("Node with hostname: " + host + " already exists")
+    }
+    rmNodes.put(rmNode.getNodeID(), rmNode)
+    nodeStore.add(getSchedulerNode(rmNode))
+    def node = nodeStore.getNode(host)
+    node.setSlaveId(slaveId)
+
+    return rmNode
+  }
+
+  SchedulerNode getSchedulerNode(RMNode rmNode) {
+    SchedulerNode schedulerNode = new SchedulerNode(rmNode, false) {
+
+      @Override
+      void reserveResource(SchedulerApplicationAttempt attempt, Priority priority, RMContainer
container) {
+      }
+
+      @Override
+      void unreserveResource(SchedulerApplicationAttempt attempt) {
+      }
+    }
+    return schedulerNode
+  }
+
+  /******************* RMContext Related ****************/
+
+  def publisher = Mock(SystemMetricsPublisher) {}
+  def writer = Mock(RMApplicationHistoryWriter) {}
+  def handler = Mock(EventHandler) {}
+
+  def dispatcher = Mock(Dispatcher) {
+    getEventHandler() >> handler
+  }
+
+  def rmContext = Mock(RMContext) {
+    getDispatcher() >> dispatcher
+    getRMApplicationHistoryWriter() >> writer
+    getSystemMetricsPublisher() >> publisher
+    getRMNodes() >> rmNodes
+  }
+
+  /******************* Offers Related ****************/
+
+  Protos.Offer addOfferToFeed(Protos.SlaveID slaveID, String host, int cpu, int mem) {
+    def offer = Protos.Offer.newBuilder()
+        .setId(Protos.OfferID.newBuilder().setValue("test_offer_id"))
+        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("test_framework_id"))
+        .setSlaveId(slaveID)
+        .setHostname(host)
+        .addResources(Protos.Resource.newBuilder()
+        .setName("cpus")
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu))
+        .setType(Protos.Value.Type.SCALAR).build())
+        .addResources(Protos.Resource.newBuilder()
+        .setName("mem")
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem))
+        .setType(Protos.Value.Type.SCALAR).build())
+        .build()
+    offerLifecycleManager.addOffers(offer)
+    return offer
+  }
+
+  /******************* Containers Related ****************/
+
+  class FGSContainer {
+    ContainerId containerId
+    Container container
+    RMContainer rmContainer
+    ContainerStatus containerStatus
+  }
+
+  def fgsContainers = new HashMap<>()
+
+  AbstractYarnScheduler yarnScheduler = Mock(AbstractYarnScheduler) {
+    getRMContainer(_ as ContainerId) >> { ContainerId cid -> fgsContainers.get(cid).rmContainer
}
+  }
+
+  FGSContainer getFGSContainer(RMNode node, int cid, int cpu, int mem, ContainerState state)
{
+    FGSContainer fgsContainer = createFGSContainer(node, cid, cpu, mem, state)
+    if (!fgsContainers[fgsContainer.containerId]) {
+      fgsContainers[fgsContainer.containerId] =  fgsContainer
+    }
+    return fgsContainer
+  }
+
+  private FGSContainer createFGSContainer(RMNode node, int cid, int cpu, int mem, ContainerState
state) {
+    ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(123456789, 1), 1), cid)
+    FGSContainer fgsContainer = new FGSContainer()
+    fgsContainer.containerId = containerId
+    fgsContainer.container = Container.newInstance(containerId, node.getNodeID(), node.getHttpAddress(),
+        Resources.createResource(mem, cpu), null, null)
+    fgsContainer.rmContainer = new RMContainerImpl(fgsContainer.container, containerId.getApplicationAttemptId(),
+        node.getNodeID(), "user1", rmContext)
+    nodeStore.getNode(node.getNodeID().getHost()).getNode().allocateContainer(fgsContainer.rmContainer)
+    fgsContainer.containerStatus = ContainerStatus.newInstance(containerId, state, "", 0)
+    return fgsContainer
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
new file mode 100644
index 0000000..c9d7d40
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
@@ -0,0 +1,96 @@
+package com.ebay.myriad.scheduler.fgs
+
+import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry
+import com.ebay.myriad.state.SchedulerState
+import org.apache.hadoop.yarn.api.records.ContainerState
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent
+import org.apache.hadoop.yarn.util.resource.Resources
+import org.apache.mesos.Protos
+import org.slf4j.Logger
+
+/**
+ *
+ * Tests for NMHeartBeatHandler
+ *
+ */
+class NMHeartBeatHandlerSpec extends FGSTestBaseSpec {
+
+  def "Node Manager registration"() {
+    given:
+    def hbHandler = getNMHeartBeatHandler()
+    hbHandler.logger = Mock(Logger)
+
+    def nonZeroNM = getRMNode(2, 2048, "test_host1", null)
+    def zeroNM = getRMNode(0, 0, "test_host2", null)
+
+    when:
+    hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(nonZeroNM), rmContext)
+
+    then:
+    1 * hbHandler.logger.warn('FineGrainedScaling feature got invoked for a NM with non-zero
capacity. ' +
+        'Host: {}, Mem: {}, CPU: {}. Setting the NM\'s capacity to (0G,0CPU)', 'test_host1',
2048, 2)
+    nonZeroNM.getTotalCapability().getMemory() == 0
+    nonZeroNM.getTotalCapability().getVirtualCores() == 0
+
+    when:
+    hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(zeroNM), rmContext)
+
+    then:
+    0 * hbHandler.logger.warn(_) // no logger.warn invoked
+    nonZeroNM.getTotalCapability().getMemory() == 0
+    nonZeroNM.getTotalCapability().getVirtualCores() == 0
+  }
+
+  def "Node Manager HeartBeat"() {
+    given:
+    def host = "test_host"
+    def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
+    def zeroNM = getRMNode(0, 0, host, slaveId)
+
+    def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
+    def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.COMPLETE)
+    def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.RUNNING)
+
+    addOfferToFeed(slaveId, host, 2, 2048)
+
+    def yarnNodeCapacityManager = Mock(YarnNodeCapacityManager)
+    def hbHandler = getNMHeartBeatHandler(yarnNodeCapacityManager)
+
+    when:
+    hbHandler.handleStatusUpdate(
+        getHBEvent(
+            zeroNM,
+            fgsContainer1.containerStatus,
+            fgsContainer2.containerStatus,
+            fgsContainer3.containerStatus),
+        rmContext)
+
+    then:
+    nodeStore.getNode(host).getContainerSnapshot().size() == 3
+    1 * yarnNodeCapacityManager.setNodeCapacity(zeroNM, Resources.createResource(4096, 4))
+  }
+
+
+  RMNodeStartedEvent getNMRegistrationEvent(RMNode node) {
+    new RMNodeStartedEvent(node.getNodeID(), null, null)
+  }
+
+  RMNodeStatusEvent getHBEvent(RMNode node, ContainerStatus... statuses) {
+    return new RMNodeStatusEvent(node.getNodeID(), null, Arrays.asList(statuses), null, null)
+  }
+
+  NMHeartBeatHandler getNMHeartBeatHandler() {
+    return getNMHeartBeatHandler(Mock(YarnNodeCapacityManager))
+  }
+
+  NMHeartBeatHandler getNMHeartBeatHandler(YarnNodeCapacityManager yarnNodeCapacityMgr) {
+    def registry = Mock(InterceptorRegistry)
+    def state = Mock(SchedulerState)
+    return new NMHeartBeatHandler(registry, yarnScheduler, myriadDriver,
+        yarnNodeCapacityMgr, offerLifecycleManager, nodeStore, state)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/645d17d8/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
new file mode 100644
index 0000000..0bd16b7
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -0,0 +1,119 @@
+package com.ebay.myriad.scheduler.fgs
+
+import com.ebay.myriad.configuration.NodeManagerConfiguration
+import com.ebay.myriad.scheduler.TaskFactory
+import com.ebay.myriad.scheduler.TaskUtils
+import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry
+import com.ebay.myriad.state.NodeTask
+import com.ebay.myriad.state.SchedulerState
+import org.apache.hadoop.yarn.api.records.ContainerState
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent
+import org.apache.hadoop.yarn.util.resource.Resources
+import org.apache.mesos.Protos
+
+/**
+ *
+ * Tests for YarnNodeCapacityManager
+ *
+ */
+class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
+
+  def "No Containers Allocated Due To Mesos Offers"() {
+    given:
+    def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+    def host = "test_host"
+    def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
+    def zeroNM = getRMNode(0, 0, host, slaveId)
+
+    // have a mesos offer before HB
+    def offer = addOfferToFeed(slaveId, host, 4, 4096)
+    offerLifecycleManager.markAsConsumed(offer)
+
+    //  2 containers before HB.
+    def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
+    def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING)
+    nodeStore.getNode(host).snapshotRunningContainers()
+
+    // Node's capacity set to match the size of 2 containers + mesos offers
+    yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6))
+
+    // no new container allocations
+
+    when:
+    yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
+
+    then:
+    nodeStore.getNode(host).getNode().getRunningContainers().size() == 2  // 2 containers
still running
+    1 * mesosDriver.declineOffer(offer.getId())   // offer rejected, as it's not used to
allocate more containers
+    zeroNM.getTotalCapability().getVirtualCores() == 2 // capacity returns back to match
size of running containers
+    zeroNM.getTotalCapability().getMemory() == 2048
+    nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released
+  }
+
+  def "Containers Allocated Due To Mesos Offers"() {
+    given:
+    def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+    def host = "test_host"
+    def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
+    def zeroNM = getRMNode(0, 0, host, slaveId)
+
+    // have a mesos offer before HB
+    def offer = addOfferToFeed(slaveId, host, 4, 4096)
+    offerLifecycleManager.markAsConsumed(offer)
+
+    //  2 containers before HB.
+    def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
+    def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING)
+    nodeStore.getNode(host).snapshotRunningContainers()
+
+    // Node's capacity set to match the size of 2 running containers + mesos offers
+    yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6))
+
+    // 2 new containers allocated after HB
+    def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.NEW)
+    def fgsContainer4 = getFGSContainer(zeroNM, 4, 1, 1024, ContainerState.NEW)
+
+    when:
+    yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
+
+    then:
+    nodeStore.getNode(host).getNode().getRunningContainers().size() == 4  // 2 running +
2 new
+    1 * mesosDriver.launchTasks(_ as Collection<Protos.OfferID>, _ as List<Protos.TaskInfo>)
// for place holder tasks
+    zeroNM.getTotalCapability().getVirtualCores() == 4 // capacity equals size of running
+ new containers
+    zeroNM.getTotalCapability().getMemory() == 4096
+    nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released
+  }
+
+  def "Set Node Capacity"() {
+    given:
+    def zeroNM = getRMNode(0, 0, "test_host", null)
+    def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+    when:
+    yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(2048, 2))
+
+    then:
+    zeroNM.getTotalCapability().getMemory() == 2048
+    zeroNM.getTotalCapability().getVirtualCores() == 2
+    1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent)
+  }
+
+  YarnNodeCapacityManager getYarnNodeCapacityManager() {
+    def registry = Mock(InterceptorRegistry)
+    def executorInfo = Protos.ExecutorInfo.newBuilder()
+        .setExecutorId(Protos.ExecutorID.newBuilder().setValue("some_id"))
+        .setCommand(Protos.CommandInfo.newBuilder())
+        .build()
+    def nodeTask = Mock(NodeTask) {
+      getExecutorInfo() >> executorInfo
+    }
+    def state = Mock(SchedulerState) {
+      getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask
+    }
+    return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext,
+        myriadDriver, offerLifecycleManager, nodeStore, state)
+
+  }
+}


Mime
View raw message