myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dar...@apache.org
Subject [2/3] incubator-myriad git commit: Implementation of MYRIAD-229, MYRIAD-237, MYRIAD-238, MYRIAD-225 JIRA: [MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225 [MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239 [MYRIAD
Date Tue, 30 Aug 2016 17:18:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsTest.java
new file mode 100644
index 0000000..cf47b8c
--- /dev/null
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.myriad.BaseConfigurableTest;
+import org.apache.myriad.TestObjectFactory;
+import org.apache.myriad.state.NodeTask;
+import org.apache.myriad.state.SchedulerState;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Unit tests for SchedulerUtils
+ */
+public class SchedulerUtilsTest extends BaseConfigurableTest {
+  private SchedulerState sState;
+  private TaskID idOne, idTwo, idThree;
+  private NodeTask taskOne, taskTwo, taskThree;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    sState = TestObjectFactory.getSchedulerState(this.cfg, "tmp/scheduler-utils-test");
+    idOne = TaskID.newBuilder().setValue("Task1").build();
+    idTwo = TaskID.newBuilder().setValue("Task2").build();
+    idThree = TaskID.newBuilder().setValue("Task3").build();
+    taskOne = TestObjectFactory.getNodeTask("zero", "server1", 0.0, 0.0, Long.valueOf(1), Long.valueOf(2));
+    taskTwo = TestObjectFactory.getNodeTask("low", "localhost", 0.2, 1024.0, Long.valueOf(1), Long.valueOf(2));
+    taskThree = TestObjectFactory.getNodeTask("medium", "localhost", 0.4, 2048.0, Long.valueOf(1), Long.valueOf(2));
+
+    sState.addTask(idOne, taskOne);
+    sState.addTask(idTwo, taskTwo);  
+    sState.addTask(idThree, taskThree);
+    
+    this.baseStateStoreDirectory = "/tmp/scheduler-utils-test";
+  }
+  
+  @Test
+  public void testIsUniqueFilenameTrue() throws Exception {
+    List<NodeTask> tasks = Lists.newArrayList(taskOne, taskTwo, taskThree);
+    NodeTask newTask = TestObjectFactory.getNodeTask("medium", "server1", 0.4, 2048.0, Long.valueOf(1), Long.valueOf(2));
+    Offer offer = TestObjectFactory.getOffer("server2", "slave1", "mock-framework", "offer1", 0.0, 0.0);
+    assertTrue(SchedulerUtils.isUniqueHostname(offer, newTask, tasks));
+  }
+  
+  @Test
+  public void testIsUniqueFilenameFalse() throws Exception {
+    List<NodeTask> tasks = Lists.newArrayList(taskOne, taskTwo, taskThree);
+    NodeTask newTask = TestObjectFactory.getNodeTask("medium", "localhost", 0.4, 2048.0, Long.valueOf(1), Long.valueOf(2));
+    Offer offer = TestObjectFactory.getOffer("localhost", "slave1", "mock-framework", "offer1", 0.2, 512.0);
+    assertFalse(SchedulerUtils.isUniqueHostname(offer, newTask, tasks));
+  }
+  
+  @Test
+  public void testIsEligibleForFineGrainedSchedulingFalse() throws Exception {
+    assertFalse(SchedulerUtils.isEligibleForFineGrainedScaling("localhost", sState));
+  }
+  
+  @Test
+  public void testIsEligibleForFineGrainedSchedulingTrue() throws Exception {
+    assertFalse(SchedulerUtils.isEligibleForFineGrainedScaling("server1", sState));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java
index 21b5ad0..c4b2b45 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java
@@ -127,5 +127,4 @@ public class TestTaskUtils extends BaseConfigurableTest {
     checkResourceList(taskUtils.getScalarResource(createScalarOffer("cpus", 0.0, 2.0), "cpus", 1.0, 1.0), "cpus", 0.0, 1.0);
     checkResourceList(taskUtils.getScalarResource(createScalarOffer("cpus", 0.0, 2.0), "cpus", 0.5, 1.5), "cpus", 0.0, 0.5);
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintSpec.groovy
deleted file mode 100644
index b56d23b..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintSpec.groovy
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler.constraints
-
-import com.google.common.collect.Lists
-import org.apache.mesos.Protos
-import spock.lang.Specification
-
-import static org.apache.mesos.Protos.Value.Text
-import static org.apache.mesos.Protos.Value.Type.TEXT
-
-/**
- *
- * Test for LikeConstraint
- *
- */
-class LikeConstraintSpec extends Specification {
-
-    def "is matching host name"() {
-        given:
-        def constraint = new LikeConstraint("hostname", "host-[0-9]*.example.com")
-
-        expect:
-        returnValue == constraint.matchesHostName(inputHostName)
-
-        where:
-        inputHostName         | returnValue
-        null                  | false
-        ""                    | false
-        "blah-blue"           | false
-        "host-12.example.com" | true
-        "host-1.example.com"  | true
-        "host-2.example.com"  | true
-    }
-
-    def "is matching dfs attribute"() {
-        given:
-        def constraint = new LikeConstraint("dfs", "true")
-
-        expect:
-        returnValue == constraint.matchesSlaveAttributes(attributes)
-
-        where:
-        attributes                                                     | returnValue
-        null                                                           | false
-        Lists.newArrayList()                                           | false
-        Lists.newArrayList(getTextAttribute("dfs", ""))                | false
-        Lists.newArrayList(getTextAttribute("dfs", "false"))           | false
-        Lists.newArrayList(getTextAttribute("Distributed FS", "true")) | false
-        Lists.newArrayList(getTextAttribute("dfs", "true"))            | true
-        Lists.newArrayList(getTextAttribute("dfs", "true"),
-                getTextAttribute("random", "random value"))            | true
-    }
-
-    def "equals"() {
-        given:
-        def constraint1 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab")
-        def constraint2 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab")
-        def constraint3 = new LikeConstraint("hostname", "perfnode133.perf.lab")
-
-        expect:
-        constraint1.equals(constraint2)
-        !constraint1.equals(constraint3)
-        !constraint2.equals(constraint3)
-    }
-
-    private static Protos.Attribute getTextAttribute(String name, String value) {
-        Protos.Attribute.newBuilder()
-                .setName(name)
-                .setType(TEXT)
-                .setText(Text.newBuilder()
-                .setValue(value))
-                .build()
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintTest.java
new file mode 100644
index 0000000..101c5e9
--- /dev/null
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.myriad.scheduler.constraints;
+
+import static org.apache.mesos.Protos.Value.Type.TEXT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Attribute;
+import org.apache.mesos.Protos.Value.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+/**
+ * Unit tests for LikeConstraint
+ */
+public class LikeConstraintTest {
+  private LikeConstraint constraintOne, constraintTwo, constraintThree;
+  
+  @Before
+  public void setUp() {
+    constraintOne = new LikeConstraint("hostname", "host-[0-9]*.example.com");
+    constraintTwo = new LikeConstraint("hostname", "host-[0-9]*.example.com");   
+    constraintThree = new LikeConstraint("dfs", "dfs-test");
+  }
+
+  private Protos.Attribute getTextAttribute(String name, String value) {
+    return Protos.Attribute.newBuilder()
+      .setName(name)
+      .setType(TEXT)
+      .setText(Text.newBuilder()
+      .setValue(value))
+      .build();
+  }  
+  
+  @Test
+  public void testEquals() throws Exception {    
+    assertTrue(constraintOne.equals(constraintTwo));
+    assertFalse(constraintOne.equals(constraintThree));
+  }
+  
+  @Test
+  public void testMatchesHostName() throws Exception {
+    assertTrue(constraintOne.matchesHostName("host-1.example.com"));
+    assertTrue(constraintTwo.matchesHostName("host-1.example.com"));
+  }
+  
+  @Test
+  public void testConstraintOnHostame() throws Exception {
+    assertTrue(constraintOne.isConstraintOnHostName());
+    assertTrue(constraintTwo.isConstraintOnHostName());
+    assertFalse(constraintThree.isConstraintOnHostName());
+  }
+  
+  @Test
+  public void testGetType() throws Exception {
+    assertEquals("LIKE", constraintOne.getType().toString());
+    assertEquals("LIKE", constraintTwo.getType().toString());
+  }
+  
+  @Test
+  public void testMatchSlaveAttributes() throws Exception {
+    List<Attribute> attributes = Lists.newArrayList(getTextAttribute("dfs", "dfs-test"));
+    assertTrue(constraintThree.matchesSlaveAttributes(attributes));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
deleted file mode 100644
index b87c1be..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler.fgs
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import org.apache.hadoop.yarn.api.records.*
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.event.Dispatcher
-import org.apache.hadoop.yarn.event.EventHandler
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode
-import org.apache.hadoop.yarn.util.resource.Resources
-import org.apache.mesos.Protos
-import org.apache.mesos.SchedulerDriver
-import org.apache.myriad.configuration.MyriadConfiguration
-import org.apache.myriad.scheduler.MyriadDriver
-import spock.lang.Specification
-
-import java.util.concurrent.ConcurrentHashMap
-
-/**
- *
- * Base class for testing Fine Grained Scaling.
- *
- */
-class FGSTestBaseSpec extends Specification {
-    def nodeStore = new NodeStore()
-    def mesosDriver = Mock(SchedulerDriver)
-    def myriadDriver = new MyriadDriver(mesosDriver)
-    def offerLifecycleManager = new OfferLifecycleManager(nodeStore, myriadDriver)
-
-    def cfg = new MyriadConfiguration()
-
-    void setup() {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
-        cfg = mapper.readValue(
-                Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"),
-                MyriadConfiguration.class)
-    }
-/******************* Nodes Related ****************/
-
-    def rmNodes = new ConcurrentHashMap<NodeId, RMNode>()
-
-
-    RMNode getRMNode(int cpu, int mem, String host, Protos.SlaveID slaveId) {
-        RMNode rmNode = MockNodes.newNodeInfo(0, Resources.createResource(mem, cpu), 0, host)
-        if (rmNodes[rmNode.getNodeID()]) {
-            throw new IllegalArgumentException("Node with hostname: " + host + " already exists")
-        }
-        rmNodes.put(rmNode.getNodeID(), rmNode)
-        nodeStore.add(getSchedulerNode(rmNode))
-        def node = nodeStore.getNode(host)
-        node.setSlaveId(slaveId)
-
-        return rmNode
-    }
-
-    SchedulerNode getSchedulerNode(RMNode rmNode) {
-        SchedulerNode schedulerNode = new SchedulerNode(rmNode, false) {
-            @Override
-            void reserveResource(SchedulerApplicationAttempt attempt, Priority priority, RMContainer container) {
-            }
-            @Override
-            void unreserveResource(SchedulerApplicationAttempt attempt) {
-            }
-        }
-        return schedulerNode
-    }
-
-
-    /******************* RMContext Related ****************/
-
-    def publisher = Mock(SystemMetricsPublisher) {}
-    def writer = Mock(RMApplicationHistoryWriter) {}
-    def handler = Mock(EventHandler) {}
-
-    def dispatcher = Mock(Dispatcher) {
-        getEventHandler() >> handler
-    }
-
-    def rmContext = Mock(RMContext) {
-        getDispatcher() >> dispatcher
-        getRMApplicationHistoryWriter() >> writer
-        getSystemMetricsPublisher() >> publisher
-        getRMNodes() >> rmNodes
-        getYarnConfiguration() >> new YarnConfiguration()
-    }
-
-    /******************* Offers Related ****************/
-
-    Protos.Offer addOfferToFeed(Protos.SlaveID slaveID, String host, int cpu, int mem) {
-        def offer = Protos.Offer.newBuilder()
-                .setId(Protos.OfferID.newBuilder().setValue("test_offer_id"))
-                .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("test_framework_id"))
-                .setSlaveId(slaveID)
-                .setHostname(host)
-                .addResources(Protos.Resource.newBuilder()
-                .setName("cpus")
-                .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu))
-                .setType(Protos.Value.Type.SCALAR).build())
-                .addResources(Protos.Resource.newBuilder()
-                .setName("mem")
-                .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem))
-                .setType(Protos.Value.Type.SCALAR).build())
-                .build()
-        offerLifecycleManager.addOffers(offer)
-        return offer
-    }
-
-    /******************* Containers Related ****************/
-
-    class FGSContainer {
-        ContainerId containerId
-        Container container
-        RMContainer rmContainer
-        ContainerStatus containerStatus
-    }
-
-    def fgsContainers = new HashMap<>()
-
-    AbstractYarnScheduler yarnScheduler = Mock(AbstractYarnScheduler) {
-        getRMContainer(_ as ContainerId) >> { ContainerId cid -> fgsContainers.get(cid).rmContainer }
-        getSchedulerNode(_ as NodeId) >> { NodeId nodeId -> getSchedulerNode(rmNodes.get(nodeId)) }
-        updateNodeResource(_ as RMNode, _ as ResourceOption) >> { }
-    }
-
-    FGSContainer getFGSContainer(RMNode node, int cid, int cpu, int mem, ContainerState state) {
-        FGSContainer fgsContainer = createFGSContainer(node, cid, cpu, mem, state)
-        if (!fgsContainers[fgsContainer.containerId]) {
-            fgsContainers[fgsContainer.containerId] = fgsContainer
-        }
-        return fgsContainer
-    }
-
-    private FGSContainer createFGSContainer(RMNode node, int cid, int cpu, int mem, ContainerState state) {
-        ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(
-                ApplicationId.newInstance(123456789, 1), 1), cid)
-        FGSContainer fgsContainer = new FGSContainer()
-        fgsContainer.containerId = containerId
-        fgsContainer.container = Container.newInstance(containerId, node.getNodeID(), node.getHttpAddress(),
-                Resources.createResource(mem, cpu), null, null)
-        fgsContainer.rmContainer = new RMContainerImpl(fgsContainer.container, containerId.getApplicationAttemptId(),
-                node.getNodeID(), "user1", rmContext)
-        nodeStore.getNode(node.getNodeID().getHost()).getNode().allocateContainer(fgsContainer.rmContainer)
-        fgsContainer.containerStatus = ContainerStatus.newInstance(containerId, state, "", 0)
-        return fgsContainer
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
deleted file mode 100644
index 4505676..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler.fgs
-
-import org.apache.hadoop.yarn.api.records.ContainerState
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent
-import org.apache.hadoop.yarn.util.resource.Resources
-import org.apache.mesos.Protos
-import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
-import org.apache.myriad.state.SchedulerState
-import org.slf4j.Logger
-
-/**
- *
- * Tests for NMHeartBeatHandler
- *
- */
-class NMHeartBeatHandlerSpec extends FGSTestBaseSpec {
-
-    def "Node Manager registration"() {
-        given:
-        def hbHandler = getNMHeartBeatHandler()
-        hbHandler.logger = Mock(Logger)
-
-        def nonZeroNM = getRMNode(2, 2048, "test_host1", null)
-        def zeroNM = getRMNode(0, 0, "test_host2", null)
-
-        when:
-        hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(nonZeroNM), rmContext)
-
-        then:
-        1 * hbHandler.logger.warn('FineGrainedScaling feature got invoked for a NM with non-zero capacity. ' +
-                'Host: {}, Mem: {}, CPU: {}. Setting the NM\'s capacity to (0G,0CPU)', 'test_host1', 2048, 2)
-        nonZeroNM.getTotalCapability().getMemory() == 0
-        nonZeroNM.getTotalCapability().getVirtualCores() == 0
-
-        when:
-        hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(zeroNM), rmContext)
-
-        then:
-        0 * hbHandler.logger.warn(_) // no logger.warn invoked
-        nonZeroNM.getTotalCapability().getMemory() == 0
-        nonZeroNM.getTotalCapability().getVirtualCores() == 0
-    }
-
-    def "Node Manager HeartBeat"() {
-        given:
-        def host = "test_host"
-        def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
-        def zeroNM = getRMNode(0, 0, host, slaveId)
-
-        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
-        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.COMPLETE)
-        def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.RUNNING)
-
-        addOfferToFeed(slaveId, host, 2, 2048)
-
-        def yarnNodeCapacityManager = Mock(YarnNodeCapacityManager)
-        def hbHandler = getNMHeartBeatHandler(yarnNodeCapacityManager)
-
-        when:
-        hbHandler.handleStatusUpdate(
-                getHBEvent(
-                        zeroNM,
-                        fgsContainer1.containerStatus,
-                        fgsContainer2.containerStatus,
-                        fgsContainer3.containerStatus),
-                rmContext)
-
-        then:
-        nodeStore.getNode(host).getContainerSnapshot().size() == 3
-        1 * yarnNodeCapacityManager.setNodeCapacity(zeroNM, Resources.createResource(4096, 4))
-    }
-
-
-    RMNodeStartedEvent getNMRegistrationEvent(RMNode node) {
-        new RMNodeStartedEvent(node.getNodeID(), null, null)
-    }
-
-    RMNodeStatusEvent getHBEvent(RMNode node, ContainerStatus... statuses) {
-        return new RMNodeStatusEvent(node.getNodeID(), null, Arrays.asList(statuses), null, null)
-    }
-
-    NMHeartBeatHandler getNMHeartBeatHandler() {
-        return getNMHeartBeatHandler(Mock(YarnNodeCapacityManager))
-    }
-
-    NMHeartBeatHandler getNMHeartBeatHandler(YarnNodeCapacityManager yarnNodeCapacityMgr) {
-        def registry = Mock(InterceptorRegistry)
-        def state = Mock(SchedulerState)
-        return new NMHeartBeatHandler(registry, yarnScheduler, myriadDriver,
-                yarnNodeCapacityMgr, offerLifecycleManager, nodeStore, state)
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerTest.java
new file mode 100644
index 0000000..6cdf8df
--- /dev/null
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeHealthStatusPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.apache.mesos.Protos.Offer;
+import org.apache.myriad.BaseConfigurableTest;
+import org.apache.myriad.TestObjectFactory;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.MockSchedulerDriver;
+import org.apache.myriad.scheduler.MyriadDriver;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.TaskUtils;
+import org.apache.myriad.scheduler.yarn.MyriadFairScheduler;
+import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.state.MockRMContext;
+import org.apache.myriad.state.NodeTask;
+import org.apache.myriad.state.SchedulerState;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Contains test cases for NMHeartBeanHandler
+ */
+public class NMHeartBeatHandlerTest extends BaseConfigurableTest {
+  private YarnNodeCapacityManager manager;
+  private NMHeartBeatHandler handler;
+  private NodeStore store; 
+  private ServiceResourceProfile profileZero, profileSmall;
+  private RMNode nodeOne, nodeTwo;
+  private NodeTask nodeTaskOne, nodeTaskTwo;
+  private FSSchedulerNode sNodeOne, sNodeTwo;
+  private SchedulerState state;
+  private MockRMContext context;
+  private OfferLifecycleManager olManager;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    this.baseStateStoreDirectory = "/tmp/nm-heartbeat-handler-test";
+    context = new MockRMContext();
+    context.setDispatcher(TestObjectFactory.getMockDispatcher());
+    context.setSystemMetricsPublisher(new SystemMetricsPublisher());
+    
+    profileZero = TestObjectFactory.getServiceResourceProfile("zero", Double.valueOf(0.0), Double.valueOf(0.0), 
+             Long.valueOf(0), Long.valueOf(0));
+    profileSmall = TestObjectFactory.getServiceResourceProfile("small", Double.valueOf(2.0), Double.valueOf(2048.0), 
+            Long.valueOf(1), Long.valueOf(1024));
+    
+    nodeOne = TestObjectFactory.getRMNode("localhost-one", 8800, Resource.newInstance(0, 0));
+    nodeTwo = TestObjectFactory.getRMNode("localhost-two", 8800, Resource.newInstance(1024, 2));
+
+    sNodeOne = new FSSchedulerNode(nodeOne, false);
+    sNodeTwo = new FSSchedulerNode(nodeTwo, false);
+    nodeTaskOne = TestObjectFactory.getNodeTask("localhost-one", profileZero);
+    nodeTaskTwo = TestObjectFactory.getNodeTask("localhost-two", profileSmall);
+    
+    ConcurrentMap<NodeId, RMNode> rmNodes = new ConcurrentHashMap<NodeId, RMNode>();
+    rmNodes.put(nodeOne.getNodeID(), nodeOne);
+    rmNodes.put(nodeTwo.getNodeID(), nodeTwo);
+    context.setRMNodes(rmNodes);
+    
+    store = new NodeStore();
+    store.add(sNodeOne);
+    store.add(sNodeTwo);
+    
+    MyriadDriver driver = TestObjectFactory.getMyriadDriver(new MockSchedulerDriver());
+    olManager = new OfferLifecycleManager(store, driver);
+    
+    state = TestObjectFactory.getSchedulerState(new MyriadConfiguration(), "/tmp/nm-heartbeat-handler-test");
+    state.addNodes(Lists.newArrayList(nodeTaskOne, nodeTaskTwo));
+    MyriadFairScheduler scheduler = TestObjectFactory.getMyriadFairScheduler(context);
+   
+    scheduler.addNode(sNodeOne);
+    scheduler.addNode(sNodeTwo);
+    
+    manager = new YarnNodeCapacityManager(new CompositeInterceptor(), scheduler, 
+            context, driver, olManager, store, state, new TaskUtils(this.cfg));
+    handler = new NMHeartBeatHandler(new CompositeInterceptor(), scheduler, 
+            driver, manager, olManager, store, state, cfg.getNodeManagerConfiguration());
+  }
+
+  @Test
+  public void testZeroNodeStartedEvent() throws Exception {
+    NMHeartBeatHandler.CallBackFilter filter = handler.getCallBackFilter();
+    filter.allowCallBacksForNode(nodeOne.getNodeID());
+    RMNodeEvent event = new RMNodeEvent(nodeOne.getNodeID(), RMNodeEventType.STARTED);
+    handler.beforeRMNodeEventHandled(event, context);
+    assertEquals(0, nodeOne.getTotalCapability().getVirtualCores());
+    assertEquals(0, nodeOne.getTotalCapability().getMemory());
+  }
+  
+  @Test
+  public void testNonZeroNodeStartedEvent() throws Exception {
+    NMHeartBeatHandler.CallBackFilter filter = handler.getCallBackFilter();
+    filter.allowCallBacksForNode(nodeTwo.getNodeID());
+    RMNodeEvent event = new RMNodeEvent(nodeTwo.getNodeID(), RMNodeEventType.STARTED);
+    handler.beforeRMNodeEventHandled(event, context);
+    /*
+     * Confirm that, since fine-grained scaling does not work for non-zero nodes, the
+     * capacity is set to zero for cores and memory
+     */
+    assertEquals(0, nodeTwo.getTotalCapability().getVirtualCores());
+    assertEquals(0, nodeTwo.getTotalCapability().getMemory());
+  }
+  
+  @Test 
+  public void testOfferWithinResourceLimits() throws Exception {
+    Resource resourcesOne = Resource.newInstance(512, 1);
+    Resource offerOne = Resource.newInstance(1024, 2);
+    Resource offerTwo = Resource.newInstance(4096, 2);
+    Resource offerThree = Resource.newInstance(1024, 8);
+ 
+    assertTrue(handler.offerWithinResourceLimits(resourcesOne, offerOne));
+    assertFalse(handler.offerWithinResourceLimits(resourcesOne, offerTwo));
+    assertFalse(handler.offerWithinResourceLimits(resourcesOne, offerThree));
+  }
+  
+  @Test 
+  public void testGetNewResourcesOfferedByMesos() throws Exception {
+    Offer offerOne = TestObjectFactory.getOffer("localhost-one", "slave-one", "mock", "offer-one", 1.0, 512.0);
+    Offer offerTwo = TestObjectFactory.getOffer("localhost-two", "slave-two", "mock", "offer-two", 2.0, 1024.0);
+    olManager.addOffers(offerOne);
+    olManager.addOffers(offerTwo);
+    Resource resourcesOne = handler.getNewResourcesOfferedByMesos("localhost-one");
+    assertEquals(1.0, resourcesOne.getVirtualCores(), 0.0);
+    assertEquals(512.0, resourcesOne.getMemory(), 0.0);
+    Resource resourcesTwo = handler.getNewResourcesOfferedByMesos("localhost-two");
+    assertEquals(2.0, resourcesTwo.getVirtualCores(), 0.0);
+    assertEquals(1024.0, resourcesTwo.getMemory(), 0.0);
+  }
+  
+  @Test
+  public void testIncrementNodeCapacityUnderCapacity() throws Exception {
+    resetNodeTotalCapability(nodeOne, 0, 0);
+    resetNodeTotalCapability(nodeTwo, 2, 512);
+    Offer offerOne = TestObjectFactory.getOffer("localhost-one", "slave-one", "mock", "offer-one", 1.0, 512.0);
+    Offer offerTwo = TestObjectFactory.getOffer("localhost-two", "slave-two", "mock", "offer-two", 3.0, 1024.0);
+    olManager.addOffers(offerOne);
+    olManager.addOffers(offerTwo);
+        
+    RMNodeStatusEvent eventOne = getRMStatusEvent(nodeOne);
+    handler.beforeRMNodeEventHandled(eventOne, context);
+    RMNodeStatusEvent eventTwo = getRMStatusEvent(nodeTwo);
+    handler.beforeRMNodeEventHandled(eventTwo, context);
+    
+    assertEquals(512, nodeOne.getTotalCapability().getMemory());
+    assertEquals(1, nodeOne.getTotalCapability().getVirtualCores());
+    assertEquals(1024, nodeTwo.getTotalCapability().getMemory());
+    assertEquals(3, nodeTwo.getTotalCapability().getVirtualCores());
+  }
+  
+  @Test
+  public void testIncrementNodeCapacityOverCapacity() throws Exception {
+    resetNodeTotalCapability(nodeOne, 1, 512);
+    resetNodeTotalCapability(nodeTwo, 2, 2048);
+    
+    //Test over memory upper limit
+    Offer offerOne = TestObjectFactory.getOffer("localhost-one", "slave-one", "mock", "offer-one", 0.2, 3072.0);
+    //Test over CPU cores upper limit
+    Offer offerTwo = TestObjectFactory.getOffer("localhost-two", "slave-two", "mock", "offer-two", 8.0, 1024.0);
+    olManager.addOffers(offerOne);
+    olManager.addOffers(offerTwo);
+
+    RMNodeStatusEvent eventOne = getRMStatusEvent(nodeOne);  
+    handler.beforeRMNodeEventHandled(eventOne, context);
+    RMNodeStatusEvent eventTwo = getRMStatusEvent(nodeTwo);
+    handler.beforeRMNodeEventHandled(eventTwo, context);
+    
+    assertEquals(512, nodeOne.getTotalCapability().getMemory());
+    assertEquals(1, nodeOne.getTotalCapability().getVirtualCores()); 
+    assertEquals(2048, nodeTwo.getTotalCapability().getMemory());
+    assertEquals(2, nodeTwo.getTotalCapability().getVirtualCores());
+  }
+    
+  private RMNodeStatusEvent getRMStatusEvent(RMNode node) {
+    NodeId id = node.getNodeID();
+    NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, "HEALTHY", System.currentTimeMillis());
+    List<ContainerStatus> cStatus = Lists.newArrayList(getContainerStatus(node));
+    List<ApplicationId> keepAliveIds = Lists.newArrayList(getApplicationId(node.getHttpPort()));
+    NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl();
+    
+    return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response);
+  }
+  
+  private ContainerStatus getContainerStatus(RMNode node) {
+    ContainerStatus status = new ContainerStatusPBImpl();
+    return status;
+  }
+
+  private ApplicationId getApplicationId(int id) {
+    return ApplicationId.newInstance(System.currentTimeMillis(), id);
+  }
+  
+  private void resetNodeTotalCapability(RMNode node, int cpuCores, int memory) {
+    node.getTotalCapability().setVirtualCores(cpuCores);
+    node.getTotalCapability().setMemory(memory);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java
index f20e603..69002d8 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java
@@ -1,11 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.scheduler.fgs;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.myriad.TestObjectFactory;
 import org.junit.Test;
@@ -15,7 +32,7 @@ import org.junit.Test;
  */
 public class NodeStoreTest {
   NodeStore store = new NodeStore();
-  SchedulerNode sNode = TestObjectFactory.getSchedulerNode(NodeId.newInstance("0.0.0.0", 8888), 2, 4096);
+  SchedulerNode sNode = TestObjectFactory.getSchedulerNode("0.0.0.0", 8888, 2, 4096);
 
   @Test
   public void testAddNode() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeTest.java
new file mode 100644
index 0000000..bc74f6b
--- /dev/null
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.apache.myriad.BaseConfigurableTest;
+import org.apache.myriad.TestObjectFactory;
+import org.apache.myriad.state.MockRMContext;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for Node and FSSchedulerNode classes
+ */
+public class NodeTest extends BaseConfigurableTest {
+  private NodeStore store; 
+  private RMNode nodeOne, nodeTwo;
+  private FSSchedulerNode sNodeOne, sNodeTwo;
+  private MockRMContext context;
+  private RMContainer containerOne;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    context = new MockRMContext();
+    context.setDispatcher(TestObjectFactory.getMockDispatcher());
+    context.setSystemMetricsPublisher(new SystemMetricsPublisher());
+    
+    nodeOne = TestObjectFactory.getRMNode("localhost-one", 8800, Resource.newInstance(1024, 2));
+    nodeTwo = TestObjectFactory.getRMNode("localhost-two", 8800, Resource.newInstance(2048, 4));
+    sNodeOne = new FSSchedulerNode(nodeOne, false);
+    sNodeTwo = new FSSchedulerNode(nodeTwo, false);
+
+    store = new NodeStore();
+    store.add(sNodeOne);
+    store.add(sNodeTwo);
+
+    containerOne = TestObjectFactory.getRMContainer(nodeOne, context, 1, 2, 1024);
+  }
+  
+  @Test
+  public void testAllocateAndReleaseContainer() throws Exception {
+    sNodeOne.allocateContainer(containerOne);
+    assertEquals(1, sNodeOne.getNumContainers());
+    sNodeOne.releaseContainer(containerOne.getContainer());
+    assertEquals(0, sNodeOne.getNumContainers());
+  }
+  
+  @Test
+  public void testTotalCapability() throws Exception {
+    assertEquals(1024, nodeOne.getTotalCapability().getMemory());
+    assertEquals(2, nodeOne.getTotalCapability().getVirtualCores());
+    assertEquals(2048, nodeTwo.getTotalCapability().getMemory());
+    assertEquals(4, nodeTwo.getTotalCapability().getVirtualCores());
+  }
+  
+  @Test
+  public void testGetAndRemoveContainerSnapshot() throws Exception {
+    sNodeOne.allocateContainer(containerOne);
+    store.getNode("localhost-one").snapshotRunningContainers();
+    assertEquals(1, store.getNode("localhost-one").getContainerSnapshot().size());
+    store.getNode("localhost-one").removeContainerSnapshot();
+    assertNull(store.getNode("localhost-one").getContainerSnapshot());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java
index a16e8e6..e8161ee 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.scheduler.fgs;
 
 import static org.junit.Assert.assertEquals;
@@ -38,13 +56,13 @@ public class OfferLifeCycleManagerTest {
   
   @Test
   public void testAddOffers() throws Exception {
-    manager.addOffers(TestObjectFactory.getOffer("localhost", "slave-1", "mock-framework", "offer-1"));
+    manager.addOffers(TestObjectFactory.getOffer("localhost", "slave-1", "mock-framework", "offer-1", 0.0, 0.0));
     assertNotNull(manager.getOfferFeed("localhost").poll());
   }
 
   @Test
   public void testMarkAsConsumed() throws Exception {
-    Offer offer = TestObjectFactory.getOffer("localhost-1", "slave-2", "mock-framework", "consumed-offer-1");
+    Offer offer = TestObjectFactory.getOffer("localhost-1", "slave-2", "mock-framework", "consumed-offer-1", 1.0, 1024.0);
     manager.addOffers(offer);
     manager.markAsConsumed(offer);
     ConsumedOffer cOffers = manager.getConsumedOffer("localhost-1");

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
deleted file mode 100644
index 4e3dc50..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler.fgs
-
-import org.apache.hadoop.yarn.api.records.ContainerState
-import org.apache.hadoop.yarn.api.records.ResourceOption
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent
-import org.apache.hadoop.yarn.util.resource.Resources
-import org.apache.mesos.Protos
-import org.apache.myriad.configuration.MyriadConfiguration
-import org.apache.myriad.configuration.NodeManagerConfiguration
-import org.apache.myriad.scheduler.TaskUtils
-import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
-import org.apache.myriad.state.NodeTask
-import org.apache.myriad.state.SchedulerState
-
-/**
- *
- * Tests for YarnNodeCapacityManager
- *
- */
-class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
-
-    def "No Containers Allocated Due To Mesos Offers"() {
-        given:
-        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
-
-        def host = "test_host"
-        def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
-        def zeroNM = getRMNode(0, 0, host, slaveId)
-
-        // have a mesos offer before HB
-        def offer = addOfferToFeed(slaveId, host, 4, 4096)
-        offerLifecycleManager.markAsConsumed(offer)
-
-        //  2 containers before HB.
-        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
-        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING)
-        nodeStore.getNode(host).snapshotRunningContainers()
-
-        // Node's capacity set to match the size of 2 containers + mesos offers
-        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6))
-
-        // no new container allocations
-
-        when:
-        yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
-
-        then:
-        nodeStore.getNode(host).getNode().getRunningContainers().size() == 2  // 2 containers still running
-        1 * mesosDriver.declineOffer(offer.getId())   // offer rejected, as it's not used to allocate more containers
-        zeroNM.getTotalCapability().getVirtualCores() == 2 // capacity returns back to match size of running containers
-        zeroNM.getTotalCapability().getMemory() == 2048
-        nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released
-    }
-
-    def "Containers Allocated Due To Mesos Offers"() {
-        given:
-        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
-
-        def host = "test_host"
-        def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build()
-        def zeroNM = getRMNode(0, 0, host, slaveId)
-
-        // have a mesos offer before HB
-        def offer = addOfferToFeed(slaveId, host, 4, 4096)
-        offerLifecycleManager.markAsConsumed(offer)
-
-        //  2 containers before HB.
-        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING)
-        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING)
-        nodeStore.getNode(host).snapshotRunningContainers()
-
-        // Node's capacity set to match the size of 2 running containers + mesos offers
-        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6))
-
-        // 2 new containers allocated after HB
-        def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.NEW)
-        def fgsContainer4 = getFGSContainer(zeroNM, 4, 1, 1024, ContainerState.NEW)
-
-        when:
-        yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
-
-        then:
-        nodeStore.getNode(host).getNode().getRunningContainers().size() == 4  // 2 running + 2 new
-        1 * mesosDriver.launchTasks(_ as Collection<Protos.OfferID>, _ as List<Protos.TaskInfo>) // for place holder tasks
-        zeroNM.getTotalCapability().getVirtualCores() == 4 // capacity equals size of running + new containers
-        zeroNM.getTotalCapability().getMemory() == 4096
-        nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released
-    }
-
-    def "Set Node Capacity"() {
-        given:
-        def zeroNM = getRMNode(0, 0, "test_host", null)
-        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
-
-        when:
-        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(2048, 2))
-
-        then:
-        zeroNM.getTotalCapability().getMemory() == 2048
-        zeroNM.getTotalCapability().getVirtualCores() == 2
-        1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent)
-    }
-	
-    YarnNodeCapacityManager getYarnNodeCapacityManager() {
-        def registry = Mock(InterceptorRegistry)
-        def executorInfo = Protos.ExecutorInfo.newBuilder()
-                .setExecutorId(Protos.ExecutorID.newBuilder().setValue("some_id"))
-                .setCommand(Protos.CommandInfo.newBuilder())
-                .build()
-        def nodeTask = Mock(NodeTask) {
-            getExecutorInfo() >> executorInfo
-        }
-        def state = Mock(SchedulerState) {
-            getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask
-        }
-        def cfg = Mock(MyriadConfiguration) {
-        	getFrameworkName() >> "MyriadTest"
-        }
-       
-        def taskUtils = new TaskUtils(cfg)
-        return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext,
-                myriadDriver, offerLifecycleManager, nodeStore, state, taskUtils)
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java
index a4f4268..987add4 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java
@@ -1,93 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.scheduler.fgs;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Set;
-import java.util.TreeMap;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.mesos.Protos.TaskID;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.apache.mesos.Protos.Offer;
 import org.apache.myriad.BaseConfigurableTest;
 import org.apache.myriad.TestObjectFactory;
+import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.scheduler.MockSchedulerDriver;
 import org.apache.myriad.scheduler.MyriadDriver;
-import org.apache.myriad.scheduler.ServiceResourceProfile;
 import org.apache.myriad.scheduler.TaskUtils;
-import org.apache.myriad.scheduler.constraints.Constraint;
-import org.apache.myriad.scheduler.constraints.LikeConstraint;
-import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
+import org.apache.myriad.scheduler.yarn.MyriadFairScheduler;
+import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.state.MockRMContext;
 import org.apache.myriad.state.NodeTask;
 import org.apache.myriad.state.SchedulerState;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
 
 /**
- * Unit tests for YarnNodeCapacityManager
+ * Contains test cases for YarnNodeCapacityManager
  */
 public class YarnNodeCapacityManagerTest extends BaseConfigurableTest {
-  YarnNodeCapacityManager manager;
-  NodeStore store;
-  NodeId zNodeId = NodeId.newInstance("0.0.0.1", 8041);
-  TaskID zTaskId = TaskID.newBuilder().setValue("nm").build();
-  NodeTask ntZero;
-  SchedulerNode zNode;
-  SchedulerState sState;
-  
+  private YarnNodeCapacityManager manager;
+  private NodeStore store; 
+  private RMNode nodeOne, nodeTwo;
+  private FSSchedulerNode sNodeOne, sNodeTwo;
+  private SchedulerState state;
+  private MockRMContext context;
+  private RMContainer containerOne;
+  private OfferLifecycleManager olManager;
+
+  @Override
   @Before
   public void setUp() throws Exception {
     super.setUp();
- 
-    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler();
-    InterceptorRegistry registry = TestObjectFactory.getInterceptorRegistry();
-    sState = TestObjectFactory.getSchedulerState(cfg);
-   
-    RMContext context = TestObjectFactory.getRMContext(new Configuration());
-    MyriadDriver driver = new MyriadDriver(new MockSchedulerDriver());
-    store = new NodeStore();
-    OfferLifecycleManager oManager = new OfferLifecycleManager(store, driver);
+    this.baseStateStoreDirectory = "/tmp/yarn-node-capacity-manager-test";
+    context = new MockRMContext();
+    context.setDispatcher(TestObjectFactory.getMockDispatcher());
+    context.setSystemMetricsPublisher(new SystemMetricsPublisher());
+    
+    nodeOne = TestObjectFactory.getRMNode("localhost-one", 8800, Resource.newInstance(2048, 4));
+    nodeTwo = TestObjectFactory.getRMNode("localhost-two", 8800, Resource.newInstance(1024, 2));
+    sNodeOne = new FSSchedulerNode(nodeOne, false);
+    sNodeTwo = new FSSchedulerNode(nodeTwo, false);
 
-    zNode  = TestObjectFactory.getSchedulerNode(zNodeId, 0, 0);
+    containerOne = TestObjectFactory.getRMContainer(nodeOne, context, 1, 2, 1024);
+    store = new NodeStore();
+    store.add(sNodeOne);
+    store.add(sNodeTwo);
     
-    manager = new YarnNodeCapacityManager(registry, scheduler, context, driver, oManager, store, sState, new TaskUtils(cfg));
+    MyriadDriver driver = TestObjectFactory.getMyriadDriver(new MockSchedulerDriver());
+    olManager = new OfferLifecycleManager(store, driver);
+    state = TestObjectFactory.getSchedulerState(new MyriadConfiguration(), "/tmp/yarn-node-capacity-manager-test");
+    MyriadFairScheduler scheduler = TestObjectFactory.getMyriadFairScheduler(context);
+    
+    scheduler.addNode(sNodeOne);
+    scheduler.addNode(sNodeTwo);
+    manager = new YarnNodeCapacityManager(new CompositeInterceptor(), scheduler, 
+              context, driver, olManager, store, state, new TaskUtils(this.cfg));
   }
-  
-  private Set<NodeTask> getNodeTasks() {
-    Constraint cZero = new LikeConstraint("0.0.0.1", "host-[0-9]*.example.com");
-    TreeMap<String, Long> ports = new TreeMap<>();
-    ServiceResourceProfile zProfile = new ServiceResourceProfile("zProfile", 0.0, 0.0, ports);
-
-    ntZero = new NodeTask(zProfile, cZero);
-    ntZero.setTaskPrefix("nm");
-    ntZero.setHostname("0.0.0.1");
- 
-    return Sets.newHashSet(ntZero);
+    
+  @Test
+  public void testIncrementNodeCapacity() throws Exception {
+    manager.incrementNodeCapacity(nodeTwo, Resource.newInstance(2048, 4));
+    assertEquals(3072, nodeTwo.getTotalCapability().getMemory());
+    assertEquals(6, nodeTwo.getTotalCapability().getVirtualCores());
   }
-
+  
   @Test
-  public void testAllowCallBacksForNode() throws Exception {
-    store.add(zNode);
-    sState.addNodes(getNodeTasks());
-    sState.addTask(zTaskId, ntZero);
-      
-    sState.makeTaskActive(zTaskId);
-    assertEquals(1, sState.getActiveTasks().size());
-
-    YarnSchedulerInterceptor.CallBackFilter filter = manager.getCallBackFilter();
-    assertTrue(filter.allowCallBacksForNode(zNodeId));
+  public void testDecrementNodeCapacity() throws Exception {
+    manager.decrementNodeCapacity(nodeOne, Resource.newInstance(1024, 2));
+    assertEquals(1024, nodeOne.getTotalCapability().getMemory());
+    assertEquals(2, nodeOne.getTotalCapability().getVirtualCores());
   }
   
-  public void testIncrementNodeCapacity() throws Exception {
-    manager.incrementNodeCapacity(zNode.getRMNode(), TestObjectFactory.getResource(2, 2048));
-    assertEquals(6, zNode.getTotalResource().getVirtualCores());
+  @Test
+  public void testHandleContainerAllocation() throws Exception {
+    Offer offer = TestObjectFactory.getOffer("zero-localhost-one", "slave-one", "mock-framework", "offer-one", 0.1, 512.0);
+    sNodeOne.allocateContainer(containerOne);
+    NodeTask task = TestObjectFactory.getNodeTask("small", "localhost-one", Double.valueOf(0.1), Double.valueOf(512.0), 
+        Long.parseLong("1"), Long.parseLong("256"));
+    state.addNodes(Lists.newArrayList(task));
+    olManager.addOffers(offer); 
+    olManager.markAsConsumed(offer);
+    manager.handleContainerAllocation(nodeOne);
+    store.getNode("localhost-one").snapshotRunningContainers();
+    assertEquals(1, store.getNode("localhost-one").getNode().getRunningContainers().size());
+    assertEquals(1, store.getNode("localhost-one").getContainerSnapshot().size());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java
index 09145a2..f68e458 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java
index bc06441..680cad4 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java
@@ -1,27 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.state;
 
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 
 /**
  * Mock Dispatcher implementation for unit tests
  */
 public class MockDispatcher implements Dispatcher {
-  EventHandler<RMAppEvent> handler = new MockEventHandler();
+  EventHandler<Event> handler = new MockEventHandler();
 
   /**
    * Mock EventHandler implementation for unit tests
    */
-  public static class MockEventHandler implements EventHandler<RMAppEvent> {
+  public static class MockEventHandler implements EventHandler<Event> {
     @Override
-    public void handle(RMAppEvent event) {
+    public void handle(Event event) {
       //noop
     }  
   }
 
   @Override
-  public EventHandler<RMAppEvent> getEventHandler() {
+  public EventHandler<Event> getEventHandler() {
     return handler;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java
index fa4628f..6a888af 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import java.util.concurrent.ExecutionException;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java
index 822c08a..ed8ffe1 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java
index 42fa045..c089a3f 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.state;
 
 import java.nio.ByteBuffer;
@@ -63,6 +81,7 @@ public class MockRMContext implements RMContext {
   SystemMetricsPublisher systemMetricsPublisher;
   ConfigurationProvider configurationProvider;
   AdminService adminService;
+  ConcurrentMap<NodeId, RMNode> rmNodes;
   
   public void setApplicationMasterService(ApplicationMasterService applicationMasterService) {
     this.applicationMasterService = applicationMasterService;
@@ -189,9 +208,13 @@ public class MockRMContext implements RMContext {
 
   @Override
   public ConcurrentMap<NodeId, RMNode> getRMNodes() {
-    return null;
+    return this.rmNodes;
   }
 
+  public void setRMNodes(ConcurrentMap<NodeId, RMNode> rmNodes) {
+    this.rmNodes = rmNodes;
+  }
+  
   @Override
   public AMLivelinessMonitor getAMLivelinessMonitor() {
     return amLivelinessMonitor;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java
index f8cb700..eeb970c 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java
index ba29ff8..0da2dd8 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java
index 06655e3..e3468e0 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import org.apache.mesos.state.Variable;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java
index e5a378f..01fa80e 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import org.apache.mesos.Protos.FrameworkID;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java
index 3f486c0..69caf99 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.state;
 
 import org.apache.myriad.scheduler.ServiceResourceProfile;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java
index cd4a12e..7a1b098 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java
@@ -1,66 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.myriad.state;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore;
-import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.TaskID;
+import org.apache.myriad.BaseConfigurableTest;
 import org.apache.myriad.TestObjectFactory;
-import org.apache.myriad.scheduler.ServiceResourceProfile;
-import org.apache.myriad.scheduler.constraints.LikeConstraint;
-import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-
 /**
  * Unit tests for SchedulerState
  */
-public class SchedulerStateTest {
-  SchedulerState state;
+public class SchedulerStateTest extends BaseConfigurableTest {
+  private NodeTask taskOne;
+  private NodeTask taskTwo;
 
-  @Before
+  @Override
   public void setUp() throws Exception {
-    MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(getConfiguration(), false);
-    state = new SchedulerState(store);    
+    super.setUp();
+    this.baseStateStoreDirectory = "/tmp/scheduler-state-test";
   }
 
+  private SchedulerState initialize() throws Exception {
+    resetStoreState();
+    taskOne = TestObjectFactory.getNodeTask("zero", "localhost", 0.0, 0.0, Long.valueOf(1), Long.valueOf(2));
+    taskTwo = TestObjectFactory.getNodeTask("low", "localhost", 0.1, 1024.0, Long.valueOf(1), Long.valueOf(2));
+    SchedulerState sState = TestObjectFactory.getSchedulerState(this.cfg, "tmp/scheduler-state-test");
+    return sState;
+  }
+  
+  @Test 
+  public void testGetFrameworkID() throws Exception {
+    SchedulerState sState = initialize();
+    assertEquals("mock-framework", sState.getFrameworkID().get().getValue());
+  }
+  
   @Test
-  public void testSetFrameworkID() throws Exception {
-    state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
-    assertEquals("mock-framework", state.getFrameworkID().get().getValue());
+  public void testAddTask() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.addTask(idOne, taskOne);
+    sState.addTask(idTwo, taskTwo);
+    assertEquals("zero", sState.getTask(idOne).getProfile().getName());
+    assertEquals("low", sState.getTask(idTwo).getProfile().getName());
   }
   
   @Test
-  public void testAddAndRemoveTask() throws Exception {
-    NodeTask task1 = new NodeTask(new ServiceResourceProfile("profile1", 0.1, 1024.0, new TreeMap<String, Long>()), new LikeConstraint("hostname", "host-[0-9]*.example.com"));
-    NodeTask task2 = new NodeTask(new ServiceResourceProfile("profile2", 0.1, 1024.0, new TreeMap<String, Long>()), new LikeConstraint("hostname", "host-[0-9]*.example.com"));
-    TaskID id1 = TaskID.newBuilder().setValue("mock-task-1").build();
-    TaskID id2 = TaskID.newBuilder().setValue("mock-task-2").build();
-    
-    Set<TaskID> taskIds = Sets.newHashSet(id1, id2);
-    state.addTask(id1, task1);
-    assertNotNull(state.getTask(id1));
-    state.addTask(id2, task2);
-    assertNotNull(state.getTask(id2));
-    assertEquals(2, state.getTasks(taskIds).size());
-    state.removeTask(id1);   
-    assertEquals(1, state.getTasks(taskIds).size());
-    assertNull(state.getTask(id1)); 
-    state.removeTask(id2);   
-    assertEquals(0, state.getTasks(taskIds).size());
-    assertNull(state.getTask(id2)); 
+  public void testMakeTestActive() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.addTask(idOne, taskOne);
+    sState.addTask(idTwo, taskTwo);
+    sState.makeTaskActive(idOne);
+    sState.makeTaskActive(idTwo);
+    assertTrue(sState.getActiveTasks().contains(taskOne));
+    assertTrue(sState.getActiveTasks().contains(taskTwo));
+  }
+  
+  @Test 
+  public void testMakeTestPending() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.makeTaskPending(idOne);
+    sState.makeTaskPending(idTwo);
+    assertEquals(2, sState.getPendingTaskIds().size());
+    assertTrue(sState.getPendingTaskIds().contains(idOne));
+    assertTrue(sState.getPendingTaskIds().contains(idTwo));    
+  }
+
+  @Test 
+  public void testMakeTestKillable() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.makeTaskKillable(idOne);
+    sState.makeTaskKillable(idTwo);
+    assertEquals(2, sState.getKillableTaskIds().size());
+    assertTrue(sState.getKillableTaskIds().contains(idOne));  
+    assertTrue(sState.getKillableTaskIds().contains(idTwo));    
   }
 
-  private Configuration getConfiguration() {
-    Configuration conf = new Configuration();
-    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
-    return conf;
+  @Test 
+  public void testMakeTestStaging() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.addTask(idOne, taskOne);
+    sState.addTask(idTwo, taskTwo);
+    sState.makeTaskStaging(idOne);
+    sState.makeTaskStaging(idTwo);
+    assertEquals(2, sState.getStagingTasks().size());
+    assertTrue(sState.getStagingTasks().contains(taskOne));
+    assertTrue(sState.getStagingTasks().contains(taskTwo));    
+  }
+
+  @Test 
+  public void testMakeTestLost() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.makeTaskLost(idOne);
+    sState.makeTaskLost(idTwo);
+    assertEquals(2, sState.getLostTaskIds().size());
+    assertTrue(sState.getLostTaskIds().contains(idOne));
+    assertTrue(sState.getLostTaskIds().contains(idTwo));    
+  }  
+
+  @Test
+  public void testRemoveTask() throws Exception {
+    SchedulerState sState = initialize();
+    TaskID idOne = TaskID.newBuilder().setValue("Task1").build();
+    TaskID idTwo = TaskID.newBuilder().setValue("Task2").build();
+    sState.removeTask(idOne);
+    assertNull(sState.getTask(idOne));
+    sState.removeTask(idTwo);
+    assertNull(sState.getTask(idTwo));
   }
 }
\ No newline at end of file



Mime
View raw message