hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [23/51] [partial] hadoop git commit: HDFS-13405. Ozone: Rename HDSL to HDDS. Contributed by Ajay Kumar, Elek Marton, Mukul Kumar Singh, Shashikant Banerjee and Anu Engineer.
Date Thu, 05 Apr 2018 18:33:20 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java
new file mode 100644
index 0000000..63cc9bf
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.states;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test ContainerAttribute management.
+ */
+public class TestContainerAttribute {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testInsert() throws SCMException {
+    ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>();
+    ContainerID id = new ContainerID(42);
+    containerAttribute.insert(1, id);
+    Assert.assertEquals(1,
+        containerAttribute.getCollection(1).size());
+    Assert.assertTrue(containerAttribute.getCollection(1).contains(id));
+
+    // Insert again and verify that it overwrites an existing value.
+    ContainerID newId =
+        new ContainerID(42);
+    containerAttribute.insert(1, newId);
+    Assert.assertEquals(1,
+        containerAttribute.getCollection(1).size());
+    Assert.assertTrue(containerAttribute.getCollection(1).contains(newId));
+  }
+
+  @Test
+  public void testHasKey() throws SCMException {
+    ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>();
+
+    for (int x = 1; x < 42; x++) {
+      containerAttribute.insert(1, new ContainerID(x));
+    }
+    Assert.assertTrue(containerAttribute.hasKey(1));
+    for (int x = 1; x < 42; x++) {
+      Assert.assertTrue(containerAttribute.hasContainerID(1, x));
+    }
+
+    Assert.assertFalse(containerAttribute.hasContainerID(1,
+        new ContainerID(42)));
+  }
+
+  @Test
+  public void testClearSet() throws SCMException {
+    List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3");
+    ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
+    for (String k : keyslist) {
+      for (int x = 1; x < 101; x++) {
+        containerAttribute.insert(k, new ContainerID(x));
+      }
+    }
+    for (String k : keyslist) {
+      Assert.assertEquals(100,
+          containerAttribute.getCollection(k).size());
+    }
+    containerAttribute.clearSet("Key1");
+    Assert.assertEquals(0,
+        containerAttribute.getCollection("Key1").size());
+  }
+
+  @Test
+  public void testRemove() throws SCMException {
+
+    List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3");
+    ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
+
+    for (String k : keyslist) {
+      for (int x = 1; x < 101; x++) {
+        containerAttribute.insert(k, new ContainerID(x));
+      }
+    }
+    for (int x = 1; x < 101; x += 2) {
+      containerAttribute.remove("Key1", new ContainerID(x));
+    }
+
+    for (int x = 1; x < 101; x += 2) {
+      Assert.assertFalse(containerAttribute.hasContainerID("Key1",
+          new ContainerID(x)));
+    }
+
+    Assert.assertEquals(100,
+        containerAttribute.getCollection("Key2").size());
+
+    Assert.assertEquals(100,
+        containerAttribute.getCollection("Key3").size());
+
+    Assert.assertEquals(50,
+        containerAttribute.getCollection("Key1").size());
+  }
+
+  @Test
+  public void tesUpdate() throws SCMException {
+    String key1 = "Key1";
+    String key2 = "Key2";
+    String key3 = "Key3";
+
+    ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
+    ContainerID id = new ContainerID(42);
+
+    containerAttribute.insert(key1, id);
+    Assert.assertTrue(containerAttribute.hasContainerID(key1, id));
+    Assert.assertFalse(containerAttribute.hasContainerID(key2, id));
+
+    // This should move the id from key1 bucket to key2 bucket.
+    containerAttribute.update(key1, key2, id);
+    Assert.assertFalse(containerAttribute.hasContainerID(key1, id));
+    Assert.assertTrue(containerAttribute.hasContainerID(key2, id));
+
+    // This should fail since we cannot find this id in the key3 bucket.
+    thrown.expect(SCMException.class);
+    containerAttribute.update(key3, key1, id);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
new file mode 100644
index 0000000..ad50d97
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .HEALTHY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for different container placement policy.
+ */
+public class TestContainerPlacement {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+  private static XceiverClientManager xceiverClientManager =
+      new XceiverClientManager(new OzoneConfiguration());
+
+  private ReportState reportState = ReportState.newBuilder()
+      .setState(ReportState.states.noContainerReports)
+      .setCount(0).build();
+
+  /**
+   * Returns a new copy of Configuration.
+   *
+   * @return Config
+   */
+  OzoneConfiguration getConf() {
+    return new OzoneConfiguration();
+  }
+
+  /**
+   * Creates a NodeManager.
+   *
+   * @param config - Config for the node manager.
+   * @return SCNNodeManager
+   * @throws IOException
+   */
+
+  SCMNodeManager createNodeManager(OzoneConfiguration config)
+      throws IOException {
+    SCMNodeManager nodeManager = new SCMNodeManager(config,
+        UUID.randomUUID().toString(), null);
+    assertFalse("Node manager should be in chill mode",
+        nodeManager.isOutOfChillMode());
+    return nodeManager;
+  }
+
+  ContainerMapping createContainerManager(Configuration config,
+      NodeManager scmNodeManager) throws IOException {
+    final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    return new ContainerMapping(config, scmNodeManager, cacheSize);
+
+  }
+
+  /**
+   * Test capacity based container placement policy with node reports.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testContainerPlacementCapacity() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    final int nodeCount = 4;
+    final long capacity = 10L * OzoneConsts.GB;
+    final long used = 2L * OzoneConsts.GB;
+    final long remaining = capacity - used;
+
+    final File testDir = PathUtils.getTestDir(
+        TestContainerPlacement.class);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+
+    SCMNodeManager nodeManager = createNodeManager(conf);
+    ContainerMapping containerManager =
+        createContainerManager(conf, nodeManager);
+    List<DatanodeDetails> datanodes =
+        TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
+    try {
+      for (DatanodeDetails datanodeDetails : datanodes) {
+        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+        srb.setStorageUuid(UUID.randomUUID().toString());
+        srb.setCapacity(capacity).setScmUsed(used).
+            setRemaining(remaining).build();
+        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+            nrb.addStorageReport(srb).build(), reportState);
+      }
+
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(capacity * nodeCount,
+          (long) nodeManager.getStats().getCapacity().get());
+      assertEquals(used * nodeCount,
+          (long) nodeManager.getStats().getScmUsed().get());
+      assertEquals(remaining * nodeCount,
+          (long) nodeManager.getStats().getRemaining().get());
+
+      assertTrue(nodeManager.isOutOfChillMode());
+
+      String container1 = UUID.randomUUID().toString();
+      Pipeline pipeline1 = containerManager.allocateContainer(
+          xceiverClientManager.getType(),
+          xceiverClientManager.getFactor(), container1, "OZONE")
+          .getPipeline();
+      assertEquals(xceiverClientManager.getFactor().getNumber(),
+          pipeline1.getMachines().size());
+    } finally {
+      IOUtils.closeQuietly(containerManager);
+      IOUtils.closeQuietly(nodeManager);
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
new file mode 100644
index 0000000..de6e30c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -0,0 +1,1179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the Node Manager class.
+ */
+public class TestNodeManager {
+
+  private File testDir;
+
+  private ReportState reportState = ReportState.newBuilder()
+      .setState(ReportState.states.noContainerReports)
+      .setCount(0).build();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws IOException {
+  }
+
+  @Before
+  public void setup() {
+    testDir = PathUtils.getTestDir(
+        TestNodeManager.class);
+  }
+
+  @After
+  public void cleanup() {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  /**
+   * Returns a new copy of Configuration.
+   *
+   * @return Config
+   */
+  OzoneConfiguration getConf() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    return conf;
+  }
+
+  /**
+   * Creates a NodeManager.
+   *
+   * @param config - Config for the node manager.
+   * @return SCNNodeManager
+   * @throws IOException
+   */
+
+  SCMNodeManager createNodeManager(OzoneConfiguration config)
+      throws IOException {
+    SCMNodeManager nodeManager = new SCMNodeManager(config,
+        UUID.randomUUID().toString(), null);
+    assertFalse("Node manager should be in chill mode",
+        nodeManager.isOutOfChillMode());
+    return nodeManager;
+  }
+
+  /**
+   * Tests that Node manager handles heartbeats correctly, and comes out of
+   * chill Mode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmHeartbeat() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      // Send some heartbeats from different nodes.
+      for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
+        DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
+            nodeManager);
+        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+            null, reportState);
+      }
+
+      // Wait for 4 seconds max.
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+
+      assertTrue("Heartbeat thread should have picked up the" +
+              "scheduled heartbeats and transitioned out of chill mode.",
+          nodeManager.isOutOfChillMode());
+    }
+  }
+
+  /**
+   * asserts that if we send no heartbeats node manager stays in chillmode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNoHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertFalse("No heartbeats, Node manager should have been in" +
+          " chill mode.", nodeManager.isOutOfChillMode());
+    }
+  }
+
+  /**
+   * Asserts that if we don't get enough unique nodes we stay in chillmode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNotEnoughHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+
+      // Need 100 nodes to come out of chill mode, only one node is sending HB.
+      nodeManager.setMinimumChillModeNodes(100);
+      nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
+          .getProtoBufMessage(),
+          null, reportState);
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertFalse("Not enough heartbeat, Node manager should have" +
+          "been in chillmode.", nodeManager.isOutOfChillMode());
+    }
+  }
+
+  /**
+   * Asserts that many heartbeat from the same node is counted as a single
+   * node.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmSameNodeHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      nodeManager.setMinimumChillModeNodes(3);
+      DatanodeDetails datanodeDetails = TestUtils
+          .getDatanodeDetails(nodeManager);
+
+      // Send 10 heartbeat from same node, and assert we never leave chill mode.
+      for (int x = 0; x < 10; x++) {
+        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+            null, reportState);
+      }
+
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertFalse("Not enough nodes have send heartbeat to node" +
+          "manager.", nodeManager.isOutOfChillMode());
+    }
+  }
+
+  /**
+   * Asserts that adding heartbeats after shutdown does not work. This implies
+   * that heartbeat thread has been shutdown safely by closing the node
+   * manager.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmShutdown() throws IOException, InterruptedException,
+      TimeoutException {
+    OzoneConfiguration conf = getConf();
+    conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
+    SCMNodeManager nodeManager = createNodeManager(conf);
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(nodeManager);
+    nodeManager.close();
+
+    // These should never be processed.
+    nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        null, reportState);
+
+    // Let us just wait for 2 seconds to prove that HBs are not processed.
+    Thread.sleep(2 * 1000);
+
+    assertEquals("Assert new HBs were never processed", 0,
+        nodeManager.getLastHBProcessedCount());
+  }
+
+  /**
+   * Asserts scm informs datanodes to re-register with the nodemanager
+   * on a restart.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScmHeartbeatAfterRestart() throws Exception {
+    OzoneConfiguration conf = getConf();
+    conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
+      nodemanager.register(datanodeDetails.getProtoBufMessage());
+      List<SCMCommand> command = nodemanager.sendHeartbeat(
+          datanodeDetails.getProtoBufMessage(),
+          null, reportState);
+      Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
+      Assert.assertTrue("On regular HB calls, SCM responses a "
+          + "datanode with an empty command list", command.isEmpty());
+    }
+
+    // Sends heartbeat without registering to SCM.
+    // This happens when SCM restarts.
+    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
+      Assert.assertFalse(nodemanager
+          .getAllNodes().contains(datanodeDetails));
+      try {
+        // SCM handles heartbeat asynchronously.
+        // It may need more than one heartbeat processing to
+        // send the notification.
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override public Boolean get() {
+            List<SCMCommand> command =
+                nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+                    null, reportState);
+            return command.size() == 1 && command.get(0).getType()
+                .equals(SCMCmdType.reregisterCommand);
+          }
+        }, 100, 3 * 1000);
+      } catch (TimeoutException e) {
+        Assert.fail("Times out to verify that scm informs "
+            + "datanode to re-register itself.");
+      }
+    }
+  }
+
+  /**
+   * Asserts that we detect as many healthy nodes as we have generated heartbeat
+   * for.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmHealthyNodeCount() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    final int count = 10;
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+
+      for (int x = 0; x < count; x++) {
+        DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
+            nodeManager);
+        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+            null, reportState);
+      }
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertEquals(count, nodeManager.getNodeCount(HEALTHY));
+    }
+  }
+
+  /**
+   * Asserts that if user provides a value less than 5 times the heartbeat
+   * interval as the StaleNode Value, we throw since that is a QoS that we
+   * cannot maintain.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+
+  @Test
+  public void testScmSanityOfUserConfig1() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    final int interval = 100;
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+
+    // This should be 5 times more than  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL
+    // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, interval, MILLISECONDS);
+
+    thrown.expect(IllegalArgumentException.class);
+
+    // This string is a multiple of the interval value
+    thrown.expectMessage(
+        startsWith("100 is not within min = 500 or max = 100000"));
+    createNodeManager(conf);
+  }
+
+  /**
+   * Asserts that if Stale Interval value is more than 5 times the value of HB
+   * processing thread it is a sane value.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmSanityOfUserConfig2() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    final int interval = 100;
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS);
+
+    // This should be 5 times more than  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL
+    // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS);
+    createNodeManager(conf).close();
+  }
+
+  /**
+   * Asserts that a single node moves from Healthy to stale node, then from
+   * stale node to dead node if it misses enough heartbeats.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmDetectStaleAndDeadNode() throws IOException,
+      InterruptedException, TimeoutException {
+    final int interval = 100;
+    final int nodeCount = 10;
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount,
+          "Node");
+
+      DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager);
+
+      // Heartbeat once
+      nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
+          null, reportState);
+
+      // Heartbeat all other nodes.
+      for (DatanodeDetails dn : nodeList) {
+        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+      }
+
+      // Wait for 2 seconds .. and heartbeat good nodes again.
+      Thread.sleep(2 * 1000);
+
+      for (DatanodeDetails dn : nodeList) {
+        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+      }
+
+      // Wait for 2 seconds, wait a total of 4 seconds to make sure that the
+      // node moves into stale state.
+      Thread.sleep(2 * 1000);
+      List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE);
+      assertEquals("Expected to find 1 stale node",
+          1, nodeManager.getNodeCount(STALE));
+      assertEquals("Expected to find 1 stale node",
+          1, staleNodeList.size());
+      assertEquals("Stale node is not the expected ID", staleNode
+          .getUuid(), staleNodeList.get(0).getUuid());
+      Thread.sleep(1000);
+
+      // heartbeat good nodes again.
+      for (DatanodeDetails dn : nodeList) {
+        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+      }
+
+      //  6 seconds is the dead window for this test , so we wait a total of
+      // 7 seconds to make sure that the node moves into dead state.
+      Thread.sleep(2 * 1000);
+
+      // the stale node has been removed
+      staleNodeList = nodeManager.getNodes(STALE);
+      assertEquals("Expected to find 1 stale node",
+          0, nodeManager.getNodeCount(STALE));
+      assertEquals("Expected to find 1 stale node",
+          0, staleNodeList.size());
+
+      // Check for the dead node now.
+      List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD);
+      assertEquals("Expected to find 1 dead node", 1,
+          nodeManager.getNodeCount(DEAD));
+      assertEquals("Expected to find 1 dead node",
+          1, deadNodeList.size());
+      assertEquals("Dead node is not the expected ID", staleNode
+          .getUuid(), deadNodeList.get(0).getUuid());
+    }
+  }
+
+  /**
+   * Asserts that we log an error for null in datanode ID.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmLogErrorOnNullDatanode() throws IOException,
+      InterruptedException, TimeoutException {
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
+      nodeManager.sendHeartbeat(null, null, reportState);
+      logCapturer.stopCapturing();
+      assertThat(logCapturer.getOutput(),
+          containsString("Datanode ID in heartbeat is null"));
+    }
+  }
+
+  /**
+   * Asserts that a dead node, stale node and healthy nodes co-exist. The counts
+   * , lists and node ID match the expected node state.
+   * <p/>
+   * This test is pretty complicated because it explores all states of Node
+   * manager in a single test. Please read thru the comments to get an idea of
+   * the current state of the node Manager.
+   * <p/>
+   * This test is written like a state machine to avoid threads and concurrency
+   * issues. This test is replicated below with the use of threads. Avoiding
+   * threads make it easy to debug the state machine.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmClusterIsInExpectedState1() throws IOException,
+      InterruptedException, TimeoutException {
+    /**
+     * These values are very important. Here is what it means so you don't
+     * have to look it up while reading this code.
+     *
+     *  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the
+     *  HB processing thread that is running in the SCM. This thread must run
+     *  for the SCM  to process the Heartbeats.
+     *
+     *  OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which
+     *  datanodes will send heartbeats to SCM. Please note: This is the only
+     *  config value for node manager that is specified in seconds. We don't
+     *  want SCM heartbeat resolution to be more than in seconds.
+     *  In this test it is not used, but we are forced to set it because we
+     *  have validation code that checks Stale Node interval and Dead Node
+     *  interval is larger than the value of
+     *  OZONE_SCM_HEARTBEAT_INTERVAL.
+     *
+     *  OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse
+     *  from the last heartbeat for us to mark a node as stale. In this test
+     *  we set that to 3. That is if a node has not heartbeat SCM for last 3
+     *  seconds we will mark it as stale.
+     *
+     *  OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse
+     *  from the last heartbeat for a node to be marked dead. We have an
+     *  additional constraint that this must be at least 2 times bigger than
+     *  Stale node Interval.
+     *
+     *  With these we are trying to explore the state of this cluster with
+     *  various timeouts. Each section is commented so that you can keep
+     *  track of the state of the cluster nodes.
+     *
+     */
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+
+
+    /**
+     * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
+     */
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeDetails healthyNode =
+          TestUtils.getDatanodeDetails(nodeManager, "HealthyNode");
+      DatanodeDetails staleNode =
+          TestUtils.getDatanodeDetails(nodeManager, "StaleNode");
+      DatanodeDetails deadNode =
+          TestUtils.getDatanodeDetails(nodeManager, "DeadNode");
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          staleNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          deadNode.getProtoBufMessage(), null, reportState);
+
+      // Sleep so that heartbeat processing thread gets to run.
+      Thread.sleep(500);
+
+      //Assert all nodes are healthy.
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+
+      /**
+       * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which
+       * means that no node is heartbeating. All nodes should move to Stale.
+       */
+      Thread.sleep(3 * 1000);
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(STALE));
+
+
+      /**
+       * Cluster State : Move healthy node back to healthy state, move other 2
+       * nodes to Stale State.
+       *
+       * We heartbeat healthy node after 1 second and let other 2 nodes elapse
+       * the 3 second windows.
+       */
+
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          staleNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          deadNode.getProtoBufMessage(), null, reportState);
+
+      Thread.sleep(1500);
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      Thread.sleep(2 * 1000);
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+
+
+      // 3.5 seconds from last heartbeat for the stale and deadNode. So those
+      //  2 nodes must move to Stale state and the healthy node must
+      // remain in the healthy State.
+      List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY);
+      assertEquals("Expected one healthy node", 1, healthyList.size());
+      assertEquals("Healthy node is not the expected ID", healthyNode
+          .getUuid(), healthyList.get(0).getUuid());
+
+      assertEquals(2, nodeManager.getNodeCount(STALE));
+
+      /**
+       * Cluster State: Allow healthyNode to remain in healthy state and
+       * staleNode to move to stale state and deadNode to move to dead state.
+       */
+
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          staleNode.getProtoBufMessage(), null, reportState);
+      Thread.sleep(1500);
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      Thread.sleep(2 * 1000);
+
+      // 3.5 seconds have elapsed for stale node, so it moves into Stale.
+      // 7 seconds have elapsed for dead node, so it moves into dead.
+      // 2 Seconds have elapsed for healthy node, so it stays in healhty state.
+      healthyList = nodeManager.getNodes(HEALTHY);
+      List<DatanodeDetails> staleList = nodeManager.getNodes(STALE);
+      List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
+
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(1, nodeManager.getNodeCount(STALE));
+      assertEquals(1, nodeManager.getNodeCount(DEAD));
+
+      assertEquals("Expected one healthy node",
+          1, healthyList.size());
+      assertEquals("Healthy node is not the expected ID", healthyNode
+          .getUuid(), healthyList.get(0).getUuid());
+
+      assertEquals("Expected one stale node",
+          1, staleList.size());
+      assertEquals("Stale node is not the expected ID", staleNode
+          .getUuid(), staleList.get(0).getUuid());
+
+      assertEquals("Expected one dead node",
+          1, deadList.size());
+      assertEquals("Dead node is not the expected ID", deadNode
+          .getUuid(), deadList.get(0).getUuid());
+      /**
+       * Cluster State : let us heartbeat all the nodes and verify that we get
+       * back all the nodes in healthy state.
+       */
+      nodeManager.sendHeartbeat(
+          healthyNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          staleNode.getProtoBufMessage(), null, reportState);
+      nodeManager.sendHeartbeat(
+          deadNode.getProtoBufMessage(), null, reportState);
+      Thread.sleep(500);
+      //Assert all nodes are healthy.
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+    }
+  }
+
+  /**
+   * Heartbeat a given set of nodes at a specified frequency.
+   *
+   * @param manager       - Node Manager
+   * @param list          - List of datanodeIDs
+   * @param sleepDuration - Duration to sleep between heartbeats.
+   * @throws InterruptedException
+   */
+  private void heartbeatNodeSet(SCMNodeManager manager,
+                                List<DatanodeDetails> list,
+                                int sleepDuration) throws InterruptedException {
+    while (!Thread.currentThread().isInterrupted()) {
+      for (DatanodeDetails dn : list) {
+        manager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+      }
+      Thread.sleep(sleepDuration);
+    }
+  }
+
+  /**
+   * Create a set of Nodes with a given prefix.
+   *
+   * @param count  - number of nodes.
+   * @param prefix - A prefix string that can be used in verification.
+   * @return List of Nodes.
+   */
+  private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int
+      count, String
+      prefix) {
+    List<DatanodeDetails> list = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      list.add(TestUtils.getDatanodeDetails(nodeManager, prefix + x));
+    }
+    return list;
+  }
+
+  /**
+   * Function that tells us if we found the right number of stale nodes.
+   *
+   * @param nodeManager - node manager
+   * @param count       - number of stale nodes to look for.
+   * @return true if we found the expected number.
+   */
+  private boolean findNodes(NodeManager nodeManager, int count,
+      HddsProtos.NodeState state) {
+    return count == nodeManager.getNodeCount(state);
+  }
+
+  /**
+   * Asserts that we can create a set of nodes that send its heartbeats from
+   * different threads and NodeManager behaves as expected.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScmClusterIsInExpectedState2() throws IOException,
+      InterruptedException, TimeoutException {
+    final int healthyCount = 5000;
+    final int staleCount = 100;
+    final int deadCount = 10;
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
+
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager,
+          healthyCount, "Healthy");
+      List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager,
+          staleCount, "Stale");
+      List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, deadCount,
+          "Dead");
+
+      Runnable healthyNodeTask = () -> {
+        try {
+          // 2 second heartbeat makes these nodes stay healthy.
+          heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+      Runnable staleNodeTask = () -> {
+        try {
+          // 4 second heartbeat makes these nodes go to stale and back to
+          // healthy again.
+          heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+
+      // No Thread just one time HBs the node manager, so that these will be
+      // marked as dead nodes eventually.
+      for (DatanodeDetails dn : deadNodeList) {
+        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+      }
+
+
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+
+      Thread thread2 = new Thread(staleNodeTask);
+      thread2.setDaemon(true);
+      thread2.start();
+
+      Thread.sleep(10 * 1000);
+
+      // Assert all healthy nodes are healthy now, this has to be a greater
+      // than check since Stale nodes can be healthy when we check the state.
+
+      assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount);
+
+      assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
+
+      List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
+
+      for (DatanodeDetails node : deadList) {
+        assertThat(node.getHostName(), CoreMatchers.startsWith("Dead"));
+      }
+
+      // Checking stale nodes is tricky since they have to move between
+      // healthy and stale to avoid becoming dead nodes. So we search for
+      // that state for a while, if we don't find that state waitfor will
+      // throw.
+      GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
+          500, 4 * 1000);
+
+      thread1.interrupt();
+      thread2.interrupt();
+    }
+  }
+
+  /**
+   * Asserts that we can handle 6000+ nodes heartbeating SCM.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmCanHandleScale() throws IOException,
+      InterruptedException, TimeoutException {
+    final int healthyCount = 3000;
+    final int staleCount = 3000;
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1,
+        SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000,
+        MILLISECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeDetails> healthyList = createNodeSet(nodeManager,
+          healthyCount, "h");
+      List<DatanodeDetails> staleList = createNodeSet(nodeManager,
+          staleCount, "s");
+
+      Runnable healthyNodeTask = () -> {
+        try {
+          heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
+        } catch (InterruptedException ignored) {
+
+        }
+      };
+
+      Runnable staleNodeTask = () -> {
+        try {
+          heartbeatNodeSet(nodeManager, staleList, 4 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+      Thread thread2 = new Thread(staleNodeTask);
+      thread2.setDaemon(true);
+      thread2.start();
+      Thread.sleep(3 * 1000);
+
+      GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
+          500, 20 * 1000);
+      assertEquals("Node count mismatch",
+          healthyCount + staleCount, nodeManager.getAllNodes().size());
+
+      thread1.interrupt();
+      thread2.interrupt();
+    }
+  }
+
+  /**
+   * Asserts that SCM backs off from HB processing instead of going into an
+   * infinite loop if SCM is flooded with too many heartbeats. This many not be
+   * the best thing to do, but SCM tries to protect itself and logs an error
+   * saying that it is getting flooded with heartbeats. In real world this can
+   * lead to many nodes becoming stale or dead due to the fact that SCM is not
+   * able to keep up with heartbeat processing. This test just verifies that SCM
+   * will log that information.
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmLogsHeartbeatFlooding() throws IOException,
+      InterruptedException, TimeoutException {
+    final int healthyCount = 3000;
+
+    // Make the HB process thread run slower.
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 500,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeDetails> healthyList = createNodeSet(nodeManager,
+          healthyCount, "h");
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
+      Runnable healthyNodeTask = () -> {
+        try {
+          // No wait in the HB sending loop.
+          heartbeatNodeSet(nodeManager, healthyList, 0);
+        } catch (InterruptedException ignored) {
+        }
+      };
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+      GenericTestUtils.waitFor(() -> logCapturer.getOutput()
+          .contains("SCM is being "
+              + "flooded by heartbeats. Not able to keep up"
+              + " with the heartbeat counts."),
+          500, 20 * 1000);
+
+      thread1.interrupt();
+      logCapturer.stopCapturing();
+    }
+  }
+
+  @Test
+  public void testScmEnterAndExitChillMode() throws IOException,
+      InterruptedException {
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        MILLISECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      nodeManager.setMinimumChillModeNodes(10);
+      DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
+          nodeManager);
+      nodeManager.sendHeartbeat(
+          datanodeDetails.getProtoBufMessage(), null, reportState);
+      String status = nodeManager.getChillModeStatus();
+      Assert.assertThat(status, containsString("Still in chill " +
+          "mode, waiting on nodes to report in."));
+
+      // Should not exit chill mode since 10 nodes have not heartbeat yet.
+      assertFalse(nodeManager.isOutOfChillMode());
+
+      // Force exit chill mode.
+      nodeManager.forceExitChillMode();
+      assertTrue(nodeManager.isOutOfChillMode());
+      status = nodeManager.getChillModeStatus();
+      Assert.assertThat(status,
+          containsString("Out of chill mode."));
+
+
+      // Enter back to into chill mode.
+      nodeManager.enterChillMode();
+      assertFalse(nodeManager.isOutOfChillMode());
+      status = nodeManager.getChillModeStatus();
+      Assert.assertThat(status,
+          containsString("Out of startup chill mode," +
+              " but in manual chill mode."));
+
+      // Assert that node manager force enter cannot be overridden by nodes HBs.
+      for (int x = 0; x < 20; x++) {
+        DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
+        nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
+            null, reportState);
+      }
+
+      Thread.sleep(500);
+      assertFalse(nodeManager.isOutOfChillMode());
+
+      // Make sure that once we exit out of manual chill mode, we fall back
+      // to the number of nodes to get out chill mode.
+      nodeManager.exitChillMode();
+      assertTrue(nodeManager.isOutOfChillMode());
+      status = nodeManager.getChillModeStatus();
+      Assert.assertThat(status,
+          containsString("Out of chill mode."));
+    }
+  }
+
+  /**
+   * Test multiple nodes sending initial heartbeat with their node report.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmStatsFromNodeReport() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
+        MILLISECONDS);
+    final int nodeCount = 10;
+    final long capacity = 2000;
+    final long used = 100;
+    final long remaining = capacity - used;
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      for (int x = 0; x < nodeCount; x++) {
+        DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
+            nodeManager);
+
+        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+        srb.setStorageUuid(UUID.randomUUID().toString());
+        srb.setCapacity(capacity).setScmUsed(used).
+            setRemaining(capacity - used).build();
+        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+            nrb.addStorageReport(srb).build(), reportState);
+      }
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
+          100, 4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(capacity * nodeCount, (long) nodeManager.getStats()
+          .getCapacity().get());
+      assertEquals(used * nodeCount, (long) nodeManager.getStats()
+          .getScmUsed().get());
+      assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
+          .getRemaining().get());
+    }
+  }
+
+  /**
+   * Test single node stat update based on nodereport from different heartbeat
+   * status (healthy, stale and dead).
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNodeReportUpdate() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = getConf();
+    final int heartbeatCount = 5;
+    final int nodeCount = 1;
+    final int interval = 100;
+
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+        MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
+          nodeManager);
+      final long capacity = 2000;
+      final long usedPerHeartbeat = 100;
+
+      for (int x = 0; x < heartbeatCount; x++) {
+        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+        srb.setStorageUuid(UUID.randomUUID().toString());
+        srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat)
+            .setRemaining(capacity - x * usedPerHeartbeat).build();
+        nrb.addStorageReport(srb);
+
+        nodeManager.sendHeartbeat(
+            datanodeDetails.getProtoBufMessage(), nrb.build(), reportState);
+        Thread.sleep(100);
+      }
+
+      final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1);
+      final long expectedRemaining = capacity - expectedScmUsed;
+
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
+          100, 4 * 1000);
+
+      long foundCapacity = nodeManager.getStats().getCapacity().get();
+      assertEquals(capacity, foundCapacity);
+
+      long foundScmUsed = nodeManager.getStats().getScmUsed().get();
+      assertEquals(expectedScmUsed, foundScmUsed);
+
+      long foundRemaining = nodeManager.getStats().getRemaining().get();
+      assertEquals(expectedRemaining, foundRemaining);
+
+      // Test NodeManager#getNodeStats
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+      long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get()
+          .getCapacity().get();
+      assertEquals(capacity, nodeCapacity);
+
+      foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
+          .get();
+      assertEquals(expectedScmUsed, foundScmUsed);
+
+      foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
+          .getRemaining().get();
+      assertEquals(expectedRemaining, foundRemaining);
+
+      // Compare the result from
+      // NodeManager#getNodeStats and NodeManager#getNodeStat
+      SCMNodeStat stat1 = nodeManager.getNodeStats().
+          get(datanodeDetails);
+      SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get();
+      assertEquals(stat1, stat2);
+
+      // Wait up to 4s so that the node becomes stale
+      // Verify the usage info should be unchanged.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(STALE) == 1, 100,
+          4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+
+      foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
+          .getCapacity().get();
+      assertEquals(capacity, foundCapacity);
+      foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get()
+          .getScmUsed().get();
+      assertEquals(expectedScmUsed, foundScmUsed);
+
+      foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().
+          getRemaining().get();
+      assertEquals(expectedRemaining, foundRemaining);
+
+      // Wait up to 4 more seconds so the node becomes dead
+      // Verify usage info should be updated.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(DEAD) == 1, 100,
+          4 * 1000);
+
+      assertEquals(0, nodeManager.getNodeStats().size());
+      foundCapacity = nodeManager.getStats().getCapacity().get();
+      assertEquals(0, foundCapacity);
+
+      foundScmUsed = nodeManager.getStats().getScmUsed().get();
+      assertEquals(0, foundScmUsed);
+
+      foundRemaining = nodeManager.getStats().getRemaining().get();
+      assertEquals(0, foundRemaining);
+
+      // Send a new report to bring the dead node back to healthy
+      SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      srb.setStorageUuid(UUID.randomUUID().toString());
+      srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
+          .setRemaining(expectedRemaining).build();
+      nrb.addStorageReport(srb);
+      nodeManager.sendHeartbeat(
+          datanodeDetails.getProtoBufMessage(), nrb.build(), reportState);
+
+      // Wait up to 5 seconds so that the dead node becomes healthy
+      // Verify usage info should be updated.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(HEALTHY) == 1,
+          100, 5 * 1000);
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
+          100, 4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+      foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
+          .getCapacity().get();
+      assertEquals(capacity, foundCapacity);
+      foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
+          .get();
+      assertEquals(expectedScmUsed, foundScmUsed);
+      foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
+          .getRemaining().get();
+      assertEquals(expectedRemaining, foundRemaining);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
new file mode 100644
index 0000000..8f412de
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for SCM node pool manager.
+ */
+public class TestSCMNodePoolManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSCMNodePoolManager.class);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private final File testDir = PathUtils.getTestDir(
+      TestSCMNodePoolManager.class);
+
+  SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf)
+      throws IOException {
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    return new SCMNodePoolManager(conf);
+  }
+
+  /**
+   * Test default node pool.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDefaultNodePool() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    try {
+      final String defaultPool = "DefaultPool";
+      NodePoolManager npMgr = createNodePoolManager(conf);
+
+      final int nodeCount = 4;
+      final List<DatanodeDetails> nodes = TestUtils
+          .getListOfDatanodeDetails(nodeCount);
+      assertEquals(0, npMgr.getNodePools().size());
+      for (DatanodeDetails node: nodes) {
+        npMgr.addNode(defaultPool, node);
+      }
+      List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+      assertEquals(nodeCount, nodesRetrieved.size());
+      assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+
+      DatanodeDetails nodeRemoved = nodes.remove(2);
+      npMgr.removeNode(defaultPool, nodeRemoved);
+      List<DatanodeDetails> nodesAfterRemove = npMgr.getNodes(defaultPool);
+      assertTwoDatanodeListsEqual(nodes, nodesAfterRemove);
+
+      List<DatanodeDetails> nonExistSet = npMgr.getNodes("NonExistSet");
+      assertEquals(0, nonExistSet.size());
+    } finally {
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+
+
+  /**
+   * Test default node pool reload.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDefaultNodePoolReload() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    final String defaultPool = "DefaultPool";
+    final int nodeCount = 4;
+    final List<DatanodeDetails> nodes = TestUtils
+        .getListOfDatanodeDetails(nodeCount);
+
+    try {
+      try {
+        SCMNodePoolManager npMgr = createNodePoolManager(conf);
+        assertEquals(0, npMgr.getNodePools().size());
+        for (DatanodeDetails node : nodes) {
+          npMgr.addNode(defaultPool, node);
+        }
+        List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+        assertEquals(nodeCount, nodesRetrieved.size());
+        assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+        npMgr.close();
+      } finally {
+        LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" +
+            " and close.");
+      }
+
+      // try reload with a new NodePoolManager instance
+      try {
+        SCMNodePoolManager npMgr = createNodePoolManager(conf);
+        List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+        assertEquals(nodeCount, nodesRetrieved.size());
+        assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+      } finally {
+        LOG.info("testDefaultNodePoolReload: Finish reloading node pool.");
+      }
+    } finally {
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+
+  /**
+   * Compare and verify that two datanode lists are equal.
+   * @param list1 - datanode list 1.
+   * @param list2 - datanode list 2.
+   */
+  private void assertTwoDatanodeListsEqual(List<DatanodeDetails> list1,
+      List<DatanodeDetails> list2) {
+    assertEquals(list1.size(), list2.size());
+    Collections.sort(list1);
+    Collections.sort(list2);
+    assertTrue(ListUtils.isEqualList(list1, list2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
new file mode 100644
index 0000000..da05c59
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+/**
+ * SCM tests
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
new file mode 100644
index 0000000..433beb8
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.VersionInfo;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .VersionEndpointTask;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState.states
+    .noContainerReports;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils
+    .createEndpoint;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+/**
+ * Tests the endpoints.
+ */
+public class TestEndPoint {
+  private static InetSocketAddress serverAddress;
+  private static RPC.Server scmServer;
+  private static ScmTestMock scmServerImpl;
+  private static File testDir;
+  private static StorageContainerDatanodeProtocolProtos.ReportState
+      defaultReportState;
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (scmServer != null) {
+      scmServer.stop();
+    }
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    serverAddress = SCMTestUtils.getReuseableAddress();
+    scmServerImpl = new ScmTestMock();
+    scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
+        scmServerImpl, serverAddress, 10);
+    testDir = PathUtils.getTestDir(TestEndPoint.class);
+    defaultReportState = StorageContainerDatanodeProtocolProtos.
+        ReportState.newBuilder().setState(noContainerReports).
+        setCount(0).build();
+  }
+
+  @Test
+  /**
+   * This test asserts that we are able to make a version call to SCM server
+   * and gets back the expected values.
+   */
+  public void testGetVersion() throws Exception {
+    try (EndpointStateMachine rpcEndPoint =
+             createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .getVersion(null);
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(VersionInfo.DESCRIPTION_KEY,
+          responseProto.getKeys(0).getKey());
+      Assert.assertEquals(VersionInfo.getLatestVersion().getDescription(),
+          responseProto.getKeys(0).getValue());
+    }
+  }
+
+  @Test
+  /**
+   * We make getVersion RPC call, but via the VersionEndpointTask which is
+   * how the state machine would make the call.
+   */
+  public void testGetVersionTask() throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
+        serverAddress, 1000)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+      // if version call worked the endpoint should automatically move to the
+      // next state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          newState);
+
+      // Now rpcEndpoint should remember the version it got from SCM
+      Assert.assertNotNull(rpcEndPoint.getVersion());
+    }
+  }
+
+  @Test
+  /**
+   * This test makes a call to end point where there is no SCM server. We
+   * expect that versionTask should be able to handle it.
+   */
+  public void testGetVersionToInvalidEndpoint() throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    InetSocketAddress nonExistentServerAddress = SCMTestUtils
+        .getReuseableAddress();
+    try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
+        nonExistentServerAddress, 1000)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+      // This version call did NOT work, so endpoint should remain in the same
+      // state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+          newState);
+    }
+  }
+
+  @Test
+  /**
+   * This test makes a getVersionRPC call, but the DummyStorageServer is
+   * going to respond little slowly. We will assert that we are still in the
+   * GETVERSION state after the timeout.
+   */
+  public void testGetVersionAssertRpcTimeOut() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 100;
+    Configuration conf = SCMTestUtils.getConf();
+
+    try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
+        serverAddress, (int) rpcTimeout)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+
+      scmServerImpl.setRpcResponseDelay(1500);
+      long start = Time.monotonicNow();
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+      long end = Time.monotonicNow();
+      scmServerImpl.setRpcResponseDelay(0);
+      Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance));
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+          newState);
+    }
+  }
+
+  @Test
+  public void testRegister() throws Exception {
+    String[] scmAddressArray = new String[1];
+    scmAddressArray[0] = serverAddress.toString();
+    DatanodeDetails nodeToRegister = getDatanodeDetails();
+    try (EndpointStateMachine rpcEndPoint =
+             createEndpoint(
+                 SCMTestUtils.getConf(), serverAddress, 1000)) {
+      SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .register(nodeToRegister.getProtoBufMessage(), scmAddressArray);
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(nodeToRegister.getUuidString(),
+          responseProto.getDatanodeUUID());
+      Assert.assertNotNull(responseProto.getClusterID());
+    }
+  }
+
+  private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
+      int rpcTimeout, boolean clearDatanodeDetails) throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    EndpointStateMachine rpcEndPoint =
+        createEndpoint(conf,
+            scmAddress, rpcTimeout);
+    rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
+    RegisterEndpointTask endpointTask =
+        new RegisterEndpointTask(rpcEndPoint, conf);
+    if (!clearDatanodeDetails) {
+      DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+      HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
+          datanodeDetails.getProtoBufMessage();
+      endpointTask.setDatanodeDetailsProto(datanodeDetailsProto);
+    }
+    endpointTask.call();
+    return rpcEndPoint;
+  }
+
+  @Test
+  public void testRegisterTask() throws Exception {
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(serverAddress, 1000, false)) {
+      // Successful register should move us to Heartbeat state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterToInvalidEndpoint() throws Exception {
+    InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(address, 1000, false)) {
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterNoContainerID() throws Exception {
+    InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(address, 1000, true)) {
+      // No Container ID, therefore we tell the datanode that we would like to
+      // shutdown.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterRpcTimeout() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 200;
+    scmServerImpl.setRpcResponseDelay(1500);
+    long start = Time.monotonicNow();
+    registerTaskHelper(serverAddress, 1000, false).close();
+    long end = Time.monotonicNow();
+    scmServerImpl.setRpcResponseDelay(0);
+    Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance));
+  }
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    DatanodeDetails dataNode = getDatanodeDetails();
+    try (EndpointStateMachine rpcEndPoint =
+             createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      srb.setStorageUuid(UUID.randomUUID().toString());
+      srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
+      nrb.addStorageReport(srb);
+      SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .sendHeartbeat(
+              dataNode.getProtoBufMessage(), nrb.build(), defaultReportState);
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(0, responseProto.getCommandsCount());
+    }
+  }
+
+  private void heartbeatTaskHelper(InetSocketAddress scmAddress,
+      int rpcTimeout) throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
+    conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    // Mini Ozone cluster will not come up if the port is not true, since
+    // Ratis will exit if the server port cannot be bound. We can remove this
+    // hard coding once we fix the Ratis default behaviour.
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
+
+
+    // Create a datanode state machine for stateConext used by endpoint task
+    try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
+        TestUtils.getDatanodeDetails(), conf);
+        EndpointStateMachine rpcEndPoint =
+            createEndpoint(conf, scmAddress, rpcTimeout)) {
+      HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
+          getDatanodeDetails().getProtoBufMessage();
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
+
+      final StateContext stateContext =
+          new StateContext(conf, DatanodeStateMachine.DatanodeStates.RUNNING,
+              stateMachine);
+
+      HeartbeatEndpointTask endpointTask =
+          new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext);
+      endpointTask.setDatanodeDetailsProto(datanodeDetailsProto);
+      endpointTask.call();
+      Assert.assertNotNull(endpointTask.getDatanodeDetailsProto());
+
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+          rpcEndPoint.getState());
+    }
+  }
+
+  @Test
+  public void testHeartbeatTask() throws Exception {
+    heartbeatTaskHelper(serverAddress, 1000);
+  }
+
+  @Test
+  public void testHeartbeatTaskToInvalidNode() throws Exception {
+    InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+    heartbeatTaskHelper(invalidAddress, 1000);
+  }
+
+  @Test
+  public void testHeartbeatTaskRpcTimeOut() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 200;
+    scmServerImpl.setRpcResponseDelay(1500);
+    long start = Time.monotonicNow();
+    InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+    heartbeatTaskHelper(invalidAddress, 1000);
+    long end = Time.monotonicNow();
+    scmServerImpl.setRpcResponseDelay(0);
+    Assert.assertThat(end - start,
+        lessThanOrEqualTo(rpcTimeout + tolerance));
+  }
+
+  /**
+   * Returns a new container report.
+   * @return
+   */
+  ContainerReport getRandomContainerReport() {
+    return new ContainerReport(UUID.randomUUID().toString(),
+        DigestUtils.sha256Hex("Random"));
+  }
+
+  /**
+   * Creates dummy container reports.
+   * @param count - The number of closed containers to create.
+   * @return ContainerReportsProto
+   */
+  StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
+      createDummyContainerReports(int count) {
+    StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+        reportsBuilder = StorageContainerDatanodeProtocolProtos
+        .ContainerReportsRequestProto.newBuilder();
+    for (int x = 0; x < count; x++) {
+      reportsBuilder.addReports(getRandomContainerReport()
+          .getProtoBufMessage());
+    }
+    reportsBuilder.setDatanodeDetails(getDatanodeDetails()
+        .getProtoBufMessage());
+    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
+        .ContainerReportsRequestProto.reportType.fullReport);
+    return reportsBuilder.build();
+  }
+
+  /**
+   * Tests that rpcEndpoint sendContainerReport works as expected.
+   * @throws Exception
+   */
+  @Test
+  public void testContainerReportSend() throws Exception {
+    final int count = 1000;
+    scmServerImpl.reset();
+    try (EndpointStateMachine rpcEndPoint =
+             createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      ContainerReportsResponseProto responseProto = rpcEndPoint
+          .getEndPoint().sendContainerReport(createDummyContainerReports(
+              count));
+      Assert.assertNotNull(responseProto);
+    }
+    Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
+    Assert.assertEquals(count, scmServerImpl.getContainerCount());
+  }
+
+
+  /**
+   * Tests that rpcEndpoint sendContainerReport works as expected.
+   * @throws Exception
+   */
+  @Test
+  public void testContainerReport() throws Exception {
+    final int count = 1000;
+    scmServerImpl.reset();
+    try (EndpointStateMachine rpcEndPoint =
+             createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      ContainerReportsResponseProto responseProto = rpcEndPoint
+          .getEndPoint().sendContainerReport(createContainerReport(count));
+      Assert.assertNotNull(responseProto);
+    }
+    Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
+    Assert.assertEquals(count, scmServerImpl.getContainerCount());
+    final long expectedKeyCount = count * 1000;
+    Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount());
+    final long expectedBytesUsed = count * OzoneConsts.GB * 2;
+    Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed());
+  }
+
+  private ContainerReportsRequestProto createContainerReport(int count) {
+    StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+        reportsBuilder = StorageContainerDatanodeProtocolProtos
+        .ContainerReportsRequestProto.newBuilder();
+    for (int x = 0; x < count; x++) {
+      ContainerReport report = new ContainerReport(UUID.randomUUID().toString(),
+            DigestUtils.sha256Hex("Simulated"));
+      report.setKeyCount(1000);
+      report.setSize(OzoneConsts.GB * 5);
+      report.setBytesUsed(OzoneConsts.GB * 2);
+      report.setReadCount(100);
+      report.setReadBytes(OzoneConsts.GB * 1);
+      report.setWriteCount(50);
+      report.setWriteBytes(OzoneConsts.GB * 2);
+      report.setContainerID(1);
+
+      reportsBuilder.addReports(report.getProtoBufMessage());
+    }
+    reportsBuilder.setDatanodeDetails(getDatanodeDetails()
+        .getProtoBufMessage());
+    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
+        .ContainerReportsRequestProto.reportType.fullReport);
+    return reportsBuilder.build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message