hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viraj...@apache.org
Subject [15/50] [abbrv] hadoop git commit: HDDS-212. Introduce NodeStateManager to manage the state of Datanodes in SCM. Contributed by Nanda kumar.
Date Mon, 09 Jul 2018 18:26:07 GMT
HDDS-212. Introduce NodeStateManager to manage the state of Datanodes in SCM. Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71df8c27
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71df8c27
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71df8c27

Branch: refs/heads/HDFS-12090
Commit: 71df8c27c9a0e326232d3baf16414a63b5ea5a4b
Parents: 3b63715
Author: Nanda kumar <nanda@apache.org>
Authored: Thu Jul 5 02:11:10 2018 +0530
Committer: Nanda kumar <nanda@apache.org>
Committed: Thu Jul 5 02:11:10 2018 +0530

----------------------------------------------------------------------
 .../scm/client/ContainerOperationClient.java    |   8 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java   |  13 +-
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   4 -
 .../hadoop/hdds/scm/client/ScmClient.java       |   5 +-
 .../StorageContainerLocationProtocol.java       |   5 +-
 ...rLocationProtocolClientSideTranslatorPB.java |   8 +-
 ...rLocationProtocolServerSideTranslatorPB.java |   8 +-
 .../StorageContainerLocationProtocol.proto      |  19 +-
 hadoop-hdds/common/src/main/proto/hdds.proto    |  13 +-
 .../common/src/main/resources/ozone-default.xml |  11 -
 .../apache/hadoop/hdds/scm/HddsServerUtil.java  |  11 -
 .../protocol/StorageContainerNodeProtocol.java  |   4 +-
 .../hadoop/hdds/scm/node/DatanodeInfo.java      | 109 ++++
 .../hdds/scm/node/HeartbeatQueueItem.java       |  98 ----
 .../hadoop/hdds/scm/node/NodeManager.java       |  16 +-
 .../hadoop/hdds/scm/node/NodeStateManager.java  | 575 +++++++++++++++++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java    | 506 ++--------------
 .../node/states/NodeAlreadyExistsException.java |  45 ++
 .../hdds/scm/node/states/NodeException.java     |  44 ++
 .../scm/node/states/NodeNotFoundException.java  |  49 ++
 .../hdds/scm/node/states/NodeStateMap.java      | 281 +++++++++
 .../scm/server/SCMClientProtocolServer.java     |  60 +-
 .../server/SCMDatanodeHeartbeatDispatcher.java  |   2 +-
 .../scm/server/SCMDatanodeProtocolServer.java   |   2 +-
 .../hdds/scm/container/MockNodeManager.java     |  58 +-
 .../hdds/scm/node/TestContainerPlacement.java   |  10 +-
 .../hadoop/hdds/scm/node/TestNodeManager.java   | 176 ++----
 .../testutils/ReplicationNodeManagerMock.java   |  37 +-
 .../ozone/TestStorageContainerManager.java      |   4 +-
 .../hadoop/ozone/scm/node/TestQueryNode.java    |  19 +-
 .../hadoop/ozone/ksm/KeySpaceManager.java       |   6 +-
 31 files changed, 1288 insertions(+), 918 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index b04f8c4..e7bdaf0 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.UUID;
 
@@ -234,14 +233,14 @@ public class ContainerOperationClient implements ScmClient {
   /**
    * Returns a set of Nodes that meet a query criteria.
    *
-   * @param nodeStatuses - A set of criteria that we want the node to have.
+   * @param nodeStatuses - Criteria that we want the node to have.
    * @param queryScope - Query scope - Cluster or pool.
    * @param poolName - if it is pool, a pool name is required.
    * @return A set of nodes that meet the requested criteria.
    * @throws IOException
    */
   @Override
-  public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+  public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
       nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
       throws IOException {
     return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
@@ -458,7 +457,8 @@ public class ContainerOperationClient implements ScmClient {
    */
   @Override
   public long getContainerSize(long containerID) throws IOException {
-    // TODO : Fix this, it currently returns the capacity but not the current usage.
+    // TODO : Fix this, it currently returns the capacity
+    // but not the current usage.
     long size = getContainerSizeB();
     if (size == -1) {
       throw new IOException("Container size unknown!");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index c373e22..bae22a2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -35,7 +35,7 @@ import java.util.UUID;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class DatanodeDetails implements Comparable<DatanodeDetails> {
+public class DatanodeDetails implements Comparable<DatanodeDetails> {
 
   /**
    * DataNode's unique identifier in the cluster.
@@ -63,6 +63,13 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> {
     this.ports = ports;
   }
 
+  protected DatanodeDetails(DatanodeDetails datanodeDetails) {
+    this.uuid = datanodeDetails.uuid;
+    this.ipAddress = datanodeDetails.ipAddress;
+    this.hostName = datanodeDetails.hostName;
+    this.ports = datanodeDetails.ports;
+  }
+
   /**
    * Returns the DataNode UUID.
    *
@@ -238,7 +245,7 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> {
   /**
    * Builder class for building DatanodeDetails.
    */
-  public static class Builder {
+  public static final class Builder {
     private String id;
     private String ipAddress;
     private String hostName;
@@ -324,7 +331,7 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> {
   /**
    * Container to hold DataNode Port details.
    */
-  public static class Port {
+  public static final class Port {
 
     /**
      * Ports that are supported in DataNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index df6fbf0..ad326dc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -165,10 +165,6 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT =
       "10m";
 
-  public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS =
-      "ozone.scm.max.hb.count.to.process";
-  public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000;
-
   public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL =
       "ozone.scm.heartbeat.thread.interval";
   public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index ecb2173..7955179 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -150,13 +149,13 @@ public interface ScmClient {
 
   /**
    * Returns a set of Nodes that meet a query criteria.
-   * @param nodeStatuses - A set of criteria that we want the node to have.
+   * @param nodeStatuses - Criteria that we want the node to have.
    * @param queryScope - Query scope - Cluster or pool.
    * @param poolName - if it is pool, a pool name is required.
    * @return A set of nodes that meet the requested criteria.
    * @throws IOException
    */
-  HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses,
+  List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses,
       HddsProtos.QueryScope queryScope, String poolName) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index b787409..581fbd0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -94,10 +93,10 @@ public interface StorageContainerLocationProtocol {
 
   /**
    *  Queries a list of Node Statuses.
-   * @param nodeStatuses
+   * @param state
    * @return List of Datanodes.
    */
-  HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses,
+  List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
       HddsProtos.QueryScope queryScope, String poolName) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 4b03d12..ac12ea2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.ipc.RPC;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -215,20 +214,19 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
    * @return List of Datanodes.
    */
   @Override
-  public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+  public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
       nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
       throws IOException {
     // TODO : We support only cluster wide query right now. So ignoring checking
     // queryScope and poolName
     Preconditions.checkNotNull(nodeStatuses);
-    Preconditions.checkState(nodeStatuses.size() > 0);
     NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder()
-        .addAllQuery(nodeStatuses)
+        .setState(nodeStatuses)
         .setScope(queryScope).setPoolName(poolName).build();
     try {
       NodeQueryResponseProto response =
           rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
-      return response.getDatanodes();
+      return response.getDatanodesList();
     } catch (ServiceException e) {
       throw  ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index d66919f..9175ebf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -171,13 +170,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
       StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
       throws ServiceException {
     try {
-      EnumSet<HddsProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request
-          .getQueryList());
-      HddsProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet,
+      HddsProtos.NodeState nodeState = request.getState();
+      List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
           request.getScope(), request.getPoolName());
       return StorageContainerLocationProtocolProtos
           .NodeQueryResponseProto.newBuilder()
-          .setDatanodes(datanodes)
+          .addAllDatanodes(datanodes)
           .build();
     } catch (Exception e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 143c2ae..68cc35f 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -118,26 +118,13 @@ message ObjectStageChangeResponseProto {
  match the NodeState that we are requesting.
 */
 message NodeQueryRequestProto {
-
-
-  // Repeated, So we can specify more than one status type.
-  // These NodeState types are additive for now, in the sense that
-  // if you specify HEALTHY and FREE_NODE members --
-  // Then you get all healthy node which are not raft members.
-  //
-  // if you specify all healthy and dead nodes, you will get nothing
-  // back. Server is not going to dictate what combinations make sense,
-  // it is entirely up to the caller.
-  // TODO: Support operators like OR and NOT. Currently it is always an
-  // implied AND.
-
-  repeated NodeState query = 1;
+  required NodeState state = 1;
   required QueryScope scope = 2;
   optional string poolName = 3; // if scope is pool, then pool name is needed.
 }
 
 message NodeQueryResponseProto {
-  required NodePool datanodes = 1;
+  repeated Node datanodes = 1;
 }
 
 /**
@@ -194,7 +181,7 @@ service StorageContainerLocationProtocolService {
   /**
   * Returns a set of Nodes that meet a criteria.
   */
-  rpc queryNode(NodeQueryRequestProto)  returns (NodeQueryResponseProto);
+  rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
 
   /**
   * Notify from client when begin or finish container or pipeline operations on datanodes.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 1c9ee19..b9def2a 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -69,14 +69,11 @@ enum NodeType {
  * and getNodeCount.
  */
 enum NodeState {
-    HEALTHY             = 1;
-    STALE               = 2;
-    DEAD                = 3;
-    DECOMMISSIONING     = 4;
-    DECOMMISSIONED      = 5;
-    RAFT_MEMBER         = 6;
-    FREE_NODE           = 7; // Not a member in raft.
-    INVALID             = 8;
+    HEALTHY = 1;
+    STALE = 2;
+    DEAD = 3;
+    DECOMMISSIONING = 4;
+    DECOMMISSIONED = 5;
 }
 
 enum QueryScope {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 25365c8..568d267 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -774,17 +774,6 @@
     </description>
   </property>
   <property>
-    <name>ozone.scm.max.hb.count.to.process</name>
-    <value>5000</value>
-    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
-    <description>
-      The maximum number of heartbeat to process per loop of the
-      heartbeat process thread. Please see
-      ozone.scm.heartbeat.thread.interval
-      for more info.
-    </description>
-  </property>
-  <property>
     <name>ozone.scm.names</name>
     <value/>
     <tag>OZONE</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
index c734d9b..cc7adbf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
@@ -259,17 +259,6 @@ public final class HddsServerUtil {
   }
 
   /**
-   * Returns the maximum number of heartbeat to process per loop of the process
-   * thread.
-   * @param conf Configuration
-   * @return - int -- Number of HBs to process
-   */
-  public static int getMaxHBToProcessPerLoop(Configuration conf) {
-    return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
-        ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
-  }
-
-  /**
    * Timeout value for the RPC from Datanode to SCM, primarily used for
    * Heartbeats and container reports.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 790f58a..c9ef43f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -59,10 +59,8 @@ public interface StorageContainerNodeProtocol {
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.
    * @param datanodeDetails - Datanode ID.
-   * @param nodeReport - node report.
    * @return SCMheartbeat response list
    */
-  List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails,
-      NodeReportProto nodeReport);
+  List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
new file mode 100644
index 0000000..51465ee
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.util.Time;
+
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class extends the primary identifier of a Datanode with ephemeral
+ * state, eg last reported time, usage information etc.
+ */
+public class DatanodeInfo extends DatanodeDetails {
+
+  private final ReadWriteLock lock;
+
+  private volatile long lastHeartbeatTime;
+  private long lastStatsUpdatedTime;
+
+  // If required we can dissect StorageReportProto and store the raw data
+  private List<StorageReportProto> storageReports;
+
+  /**
+   * Constructs DatanodeInfo from DatanodeDetails.
+   *
+   * @param datanodeDetails Details about the datanode
+   */
+  public DatanodeInfo(DatanodeDetails datanodeDetails) {
+    super(datanodeDetails);
+    lock = new ReentrantReadWriteLock();
+    lastHeartbeatTime = Time.monotonicNow();
+  }
+
+  /**
+   * Updates the last heartbeat time with current time.
+   */
+  public void updateLastHeartbeatTime() {
+    try {
+      lock.writeLock().lock();
+      lastHeartbeatTime = Time.monotonicNow();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Returns the last heartbeat time.
+   *
+   * @return last heartbeat time.
+   */
+  public long getLastHeartbeatTime() {
+    try {
+      lock.readLock().lock();
+      return lastHeartbeatTime;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Updates the datanode storage reports.
+   *
+   * @param reports list of storage report
+   */
+  public void updateStorageReports(List<StorageReportProto> reports) {
+    try {
+      lock.writeLock().lock();
+      lastStatsUpdatedTime = Time.monotonicNow();
+      storageReports = reports;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Returns the storage reports associated with this datanode.
+   *
+   * @return list of storage report
+   */
+  public List<StorageReportProto> getStorageReports() {
+    try {
+      lock.readLock().lock();
+      return storageReports;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
deleted file mode 100644
index 04658bd..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-/**
- * This class represents the item in SCM heartbeat queue.
- */
-public class HeartbeatQueueItem {
-  private DatanodeDetails datanodeDetails;
-  private long recvTimestamp;
-  private NodeReportProto nodeReport;
-
-  /**
-   *
-   * @param datanodeDetails - datanode ID of the heartbeat.
-   * @param recvTimestamp - heartbeat receive timestamp.
-   * @param nodeReport - node report associated with the heartbeat if any.
-   */
-  HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
-      NodeReportProto nodeReport) {
-    this.datanodeDetails = datanodeDetails;
-    this.recvTimestamp = recvTimestamp;
-    this.nodeReport = nodeReport;
-  }
-
-  /**
-   * @return datanode ID.
-   */
-  public DatanodeDetails getDatanodeDetails() {
-    return datanodeDetails;
-  }
-
-  /**
-   * @return node report.
-   */
-  public NodeReportProto getNodeReport() {
-    return nodeReport;
-  }
-
-  /**
-   * @return heartbeat receive timestamp.
-   */
-  public long getRecvTimestamp() {
-    return recvTimestamp;
-  }
-
-  /**
-   * Builder for HeartbeatQueueItem.
-   */
-  public static class Builder {
-    private DatanodeDetails datanodeDetails;
-    private NodeReportProto nodeReport;
-    private long recvTimestamp = monotonicNow();
-
-    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
-      this.datanodeDetails = dnDetails;
-      return this;
-    }
-
-    public Builder setNodeReport(NodeReportProto report) {
-      this.nodeReport = report;
-      return this;
-    }
-
-    @VisibleForTesting
-    public Builder setRecvTimestamp(long recvTime) {
-      this.recvTimestamp = recvTime;
-      return this;
-    }
-
-    public HeartbeatQueueItem build() {
-      return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 72d7e94..c13c37c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
@@ -54,14 +53,14 @@ import java.util.UUID;
  * list, by calling removeNode. We will throw away this nodes info soon.
  */
 public interface NodeManager extends StorageContainerNodeProtocol,
-    NodeManagerMXBean, Closeable, Runnable {
+    NodeManagerMXBean, Closeable {
   /**
    * Removes a data node from the management of this Node Manager.
    *
    * @param node - DataNode.
-   * @throws UnregisteredNodeException
+   * @throws NodeNotFoundException
    */
-  void removeNode(DatanodeDetails node) throws UnregisteredNodeException;
+  void removeNode(DatanodeDetails node) throws NodeNotFoundException;
 
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
@@ -124,13 +123,6 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
 
   /**
-   * Wait for the heartbeat is processed by NodeManager.
-   * @return true if heartbeat has been processed.
-   */
-  @VisibleForTesting
-  boolean waitForHeartbeatProcessed();
-
-  /**
    * Returns the node state of a specific node.
    * @param datanodeDetails DatanodeDetails
    * @return Healthy/Stale/Dead.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
new file mode 100644
index 0000000..5543c04
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.node.states.NodeStateMap;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * NodeStateManager maintains the state of all the datanodes in the cluster. All
+ * the node state change should happen only via NodeStateManager. It also
+ * runs a heartbeat thread which periodically updates the node state.
+ * <p>
+ * The getNode(byState) functions make copy of node maps and then creates a list
+ * based on that. It should be assumed that these get functions always report
+ * *stale* information. For example, getting the deadNodeCount followed by
+ * getNodes(DEAD) could very well produce totally different count. Also
+ * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
+ * guaranteed to add up to the total nodes that we know off. Please treat all
+ * get functions in this file as a snap-shot of information that is inconsistent
+ * as soon as you read it.
+ */
+public class NodeStateManager implements Runnable, Closeable {
+
+  /**
+   * Node's life cycle events.
+   */
+  private enum NodeLifeCycleEvent {
+    TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED
+  }
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(NodeStateManager.class);
+
+  /**
+   * StateMachine for node lifecycle.
+   */
+  private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine;
+  /**
+   * This is the map which maintains the current state of all datanodes.
+   */
+  private final NodeStateMap nodeStateMap;
+  /**
+   * ExecutorService used for scheduling heartbeat processing thread.
+   */
+  private final ScheduledExecutorService executorService;
+  /**
+   * The frequency in which we have run the heartbeat processing thread.
+   */
+  private final long heartbeatCheckerIntervalMs;
+  /**
+   * The timeout value which will be used for marking a datanode as stale.
+   */
+  private final long staleNodeIntervalMs;
+  /**
+   * The timeout value which will be used for marking a datanode as dead.
+   */
+  private final long deadNodeIntervalMs;
+
+  /**
+   * Constructs a NodeStateManager instance with the given configuration.
+   *
+   * @param conf Configuration
+   */
+  public NodeStateManager(Configuration conf) {
+    nodeStateMap = new NodeStateMap();
+    Set<NodeState> finalStates = new HashSet<>();
+    finalStates.add(NodeState.DECOMMISSIONED);
+    this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates);
+    initializeStateMachine();
+    heartbeatCheckerIntervalMs = HddsServerUtil
+        .getScmheartbeatCheckerInterval(conf);
+    staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
+    deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf);
+    Preconditions.checkState(heartbeatCheckerIntervalMs > 0,
+        OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0.");
+    Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs,
+        OZONE_SCM_STALENODE_INTERVAL + " should be less than" +
+            OZONE_SCM_DEADNODE_INTERVAL);
+    executorService = HadoopExecutors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
+    executorService.schedule(this, heartbeatCheckerIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /*
+   *
+   * Node and State Transition Mapping:
+   *
+   * State: HEALTHY         -------------------> STALE
+   * Event:                       TIMEOUT
+   *
+   * State: STALE           -------------------> DEAD
+   * Event:                       TIMEOUT
+   *
+   * State: STALE           -------------------> HEALTHY
+   * Event:                       RESTORE
+   *
+   * State: DEAD            -------------------> HEALTHY
+   * Event:                       RESURRECT
+   *
+   * State: HEALTHY         -------------------> DECOMMISSIONING
+   * Event:                     DECOMMISSION
+   *
+   * State: STALE           -------------------> DECOMMISSIONING
+   * Event:                     DECOMMISSION
+   *
+   * State: DEAD            -------------------> DECOMMISSIONING
+   * Event:                     DECOMMISSION
+   *
+   * State: DECOMMISSIONING -------------------> DECOMMISSIONED
+   * Event:                     DECOMMISSIONED
+   *
+   *  Node State Flow
+   *
+   *  +--------------------------------------------------------+
+   *  |                                     (RESURRECT)        |
+   *  |   +--------------------------+                         |
+   *  |   |      (RESTORE)           |                         |
+   *  |   |                          |                         |
+   *  V   V                          |                         |
+   * [HEALTHY]------------------->[STALE]------------------->[DEAD]
+   *    |         (TIMEOUT)          |         (TIMEOUT)       |
+   *    |                            |                         |
+   *    |                            |                         |
+   *    |                            |                         |
+   *    |                            |                         |
+   *    | (DECOMMISSION)             | (DECOMMISSION)          | (DECOMMISSION)
+   *    |                            V                         |
+   *    +------------------->[DECOMMISSIONING]<----------------+
+   *                                 |
+   *                                 | (DECOMMISSIONED)
+   *                                 |
+   *                                 V
+   *                          [DECOMMISSIONED]
+   *
+   */
+
+  /**
+   * Initializes the lifecycle of node state machine.
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(
+        NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
+    stateMachine.addTransition(
+        NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
+    stateMachine.addTransition(
+        NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
+    stateMachine.addTransition(
+        NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);
+    stateMachine.addTransition(
+        NodeState.HEALTHY, NodeState.DECOMMISSIONING,
+        NodeLifeCycleEvent.DECOMMISSION);
+    stateMachine.addTransition(
+        NodeState.STALE, NodeState.DECOMMISSIONING,
+        NodeLifeCycleEvent.DECOMMISSION);
+    stateMachine.addTransition(
+        NodeState.DEAD, NodeState.DECOMMISSIONING,
+        NodeLifeCycleEvent.DECOMMISSION);
+    stateMachine.addTransition(
+        NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
+        NodeLifeCycleEvent.DECOMMISSIONED);
+
+  }
+
+  /**
+   * Adds a new node to the state manager.
+   *
+   * @param datanodeDetails DatanodeDetails
+   *
+   * @throws NodeAlreadyExistsException if the node is already present
+   */
+  public void addNode(DatanodeDetails datanodeDetails)
+      throws NodeAlreadyExistsException {
+    nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState());
+  }
+
+  /**
+   * Get information about the node.
+   *
+   * @param datanodeDetails DatanodeDetails
+   *
+   * @return DatanodeInfo
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public DatanodeInfo getNode(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    return nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
+  }
+
+  /**
+   * Updates the last heartbeat time of the node.
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    nodeStateMap.getNodeInfo(datanodeDetails.getUuid())
+        .updateLastHeartbeatTime();
+  }
+
+  /**
+   * Returns the current state of the node.
+   *
+   * @param datanodeDetails DatanodeDetails
+   *
+   * @return NodeState
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public NodeState getNodeState(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    return nodeStateMap.getNodeState(datanodeDetails.getUuid());
+  }
+
+  /**
+   * Returns all the node which are in healthy state.
+   *
+   * @return list of healthy nodes
+   */
+  public List<DatanodeDetails> getHealthyNodes() {
+    return getNodes(NodeState.HEALTHY);
+  }
+
+  /**
+   * Returns all the node which are in stale state.
+   *
+   * @return list of stale nodes
+   */
+  public List<DatanodeDetails> getStaleNodes() {
+    return getNodes(NodeState.STALE);
+  }
+
+  /**
+   * Returns all the node which are in dead state.
+   *
+   * @return list of dead nodes
+   */
+  public List<DatanodeDetails> getDeadNodes() {
+    return getNodes(NodeState.DEAD);
+  }
+
+  /**
+   * Returns all the node which are in the specified state.
+   *
+   * @param state NodeState
+   *
+   * @return list of nodes
+   */
+  public List<DatanodeDetails> getNodes(NodeState state) {
+    List<DatanodeDetails> nodes = new LinkedList<>();
+    nodeStateMap.getNodes(state).forEach(
+        uuid -> {
+          try {
+            nodes.add(nodeStateMap.getNodeDetails(uuid));
+          } catch (NodeNotFoundException e) {
+            // This should not happen unless someone else other than
+            // NodeStateManager is directly modifying NodeStateMap and removed
+            // the node entry after we got the list of UUIDs.
+            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
+          }
+        });
+    return nodes;
+  }
+
+  /**
+   * Returns all the nodes which have registered to NodeStateManager.
+   *
+   * @return all the managed nodes
+   */
+  public List<DatanodeDetails> getAllNodes() {
+    List<DatanodeDetails> nodes = new LinkedList<>();
+    nodeStateMap.getAllNodes().forEach(
+        uuid -> {
+          try {
+            nodes.add(nodeStateMap.getNodeDetails(uuid));
+          } catch (NodeNotFoundException e) {
+            // This should not happen unless someone else other than
+            // NodeStateManager is directly modifying NodeStateMap and removed
+            // the node entry after we got the list of UUIDs.
+            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
+          }
+        });
+    return nodes;
+  }
+
+  /**
+   * Returns the count of healthy nodes.
+   *
+   * @return healthy node count
+   */
+  public int getHealthyNodeCount() {
+    return getNodeCount(NodeState.HEALTHY);
+  }
+
+  /**
+   * Returns the count of stale nodes.
+   *
+   * @return stale node count
+   */
+  public int getStaleNodeCount() {
+    return getNodeCount(NodeState.STALE);
+  }
+
+  /**
+   * Returns the count of dead nodes.
+   *
+   * @return dead node count
+   */
+  public int getDeadNodeCount() {
+    return getNodeCount(NodeState.DEAD);
+  }
+
+  /**
+   * Returns the count of nodes in specified state.
+   *
+   * @param state NodeState
+   *
+   * @return node count
+   */
+  public int getNodeCount(NodeState state) {
+    return nodeStateMap.getNodeCount(state);
+  }
+
+  /**
+   * Returns the count of all nodes managed by NodeStateManager.
+   *
+   * @return node count
+   */
+  public int getTotalNodeCount() {
+    return nodeStateMap.getTotalNodeCount();
+  }
+
+  /**
+   * Removes a node from NodeStateManager.
+   *
+   * @param datanodeDetails DatanodeDetails
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public void removeNode(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    nodeStateMap.removeNode(datanodeDetails.getUuid());
+  }
+
+  /**
+   * Move Stale or Dead node to healthy if we got a heartbeat from them.
+   * Move healthy nodes to stale nodes if it is needed.
+   * Move Stales node to dead if needed.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+
+    /*
+     *
+     *          staleNodeDeadline                healthyNodeDeadline
+     *                 |                                  |
+     *      Dead       |             Stale                |     Healthy
+     *      Node       |             Node                 |     Node
+     *      Window     |             Window               |     Window
+     * ----------------+----------------------------------+------------------->
+     *                      >>-->> time-line >>-->>
+     *
+     * Here is the logic of computing the health of a node.
+     *
+     * 1. We get the current time and look back that the time
+     *    when we got a heartbeat from a node.
+     * 
+     * 2. If the last heartbeat was within the window of healthy node we mark
+     *    it as healthy.
+     * 
+     * 3. If the last HB Time stamp is longer and falls within the window of
+     *    Stale Node time, we will mark it as Stale.
+     * 
+     * 4. If the last HB time is older than the Stale Window, then the node is
+     *    marked as dead.
+     *
+     * The Processing starts from current time and looks backwards in time.
+     */
+    long processingStartTime = Time.monotonicNow();
+    // After this time node is considered to be stale.
+    long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs;
+    // After this time node is considered to be dead.
+    long staleNodeDeadline = processingStartTime - deadNodeIntervalMs;
+
+    Predicate<Long> healthyNodeCondition =
+        (lastHbTime) -> lastHbTime >= healthyNodeDeadline;
+    // staleNodeCondition is superset of stale and dead node
+    Predicate<Long> staleNodeCondition =
+        (lastHbTime) -> lastHbTime < healthyNodeDeadline;
+    Predicate<Long> deadNodeCondition =
+        (lastHbTime) -> lastHbTime < staleNodeDeadline;
+    try {
+      for (NodeState state : NodeState.values()) {
+        List<UUID> nodes = nodeStateMap.getNodes(state);
+        for (UUID id : nodes) {
+          DatanodeInfo node = nodeStateMap.getNodeInfo(id);
+          switch (state) {
+          case HEALTHY:
+            // Move the node to STALE if the last heartbeat time is less than
+            // configured stale-node interval.
+            updateNodeState(node, staleNodeCondition, state,
+                  NodeLifeCycleEvent.TIMEOUT);
+            break;
+          case STALE:
+            // Move the node to DEAD if the last heartbeat time is less than
+            // configured dead-node interval.
+            updateNodeState(node, deadNodeCondition, state,
+                NodeLifeCycleEvent.TIMEOUT);
+            // Restore the node if we have received heartbeat before configured
+            // stale-node interval.
+            updateNodeState(node, healthyNodeCondition, state,
+                NodeLifeCycleEvent.RESTORE);
+            break;
+          case DEAD:
+            // Resurrect the node if we have received heartbeat before
+            // configured stale-node interval.
+            updateNodeState(node, healthyNodeCondition, state,
+                NodeLifeCycleEvent.RESURRECT);
+            break;
+            // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in
+            // heartbeat processing.
+          case DECOMMISSIONING:
+          case DECOMMISSIONED:
+          default:
+          }
+        }
+      }
+    } catch (NodeNotFoundException e) {
+      // This should not happen unless someone else other than
+      // NodeStateManager is directly modifying NodeStateMap and removed
+      // the node entry after we got the list of UUIDs.
+      LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
+    }
+    long processingEndTime = Time.monotonicNow();
+    //If we have taken too much time for HB processing, log that information.
+    if ((processingEndTime - processingStartTime) >
+        heartbeatCheckerIntervalMs) {
+      LOG.error("Total time spend processing datanode HB's is greater than " +
+              "configured values for datanode heartbeats. Please adjust the" +
+              " heartbeat configs. Time Spend on HB processing: {} seconds " +
+              "Datanode heartbeat Interval: {} seconds.",
+          TimeUnit.MILLISECONDS
+              .toSeconds(processingEndTime - processingStartTime),
+          heartbeatCheckerIntervalMs);
+    }
+
+    // we purposefully make this non-deterministic. Instead of using a
+    // scheduleAtFixedFrequency  we will just go to sleep
+    // and wake up at the next rendezvous point, which is currentTime +
+    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
+    // heart beating not at a fixed cadence, but clock tick + time taken to
+    // work.
+    //
+    // This time taken to work can skew the heartbeat processor thread.
+    // The reason why we don't care is because of the following reasons.
+    //
+    // 1. checkerInterval is general many magnitudes faster than datanode HB
+    // frequency.
+    //
+    // 2. if we have too much nodes, the SCM would be doing only HB
+    // processing, this could lead to SCM's CPU starvation. With this
+    // approach we always guarantee that  HB thread sleeps for a little while.
+    //
+    // 3. It is possible that we will never finish processing the HB's in the
+    // thread. But that means we have a mis-configured system. We will warn
+    // the users by logging that information.
+    //
+    // 4. And the most important reason, heartbeats are not blocked even if
+    // this thread does not run, they will go into the processing queue.
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      executorService.schedule(this, heartbeatCheckerIntervalMs,
+          TimeUnit.MILLISECONDS);
+    } else {
+      LOG.info("Current Thread is interrupted, shutting down HB processing " +
+          "thread for Node Manager.");
+    }
+
+  }
+
+  /**
+   * Updates the node state if the condition satisfies.
+   *
+   * @param node DatanodeInfo
+   * @param condition condition to check
+   * @param state current state of node
+   * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition
+   *                       matches
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
+      NodeState state, NodeLifeCycleEvent lifeCycleEvent)
+      throws NodeNotFoundException {
+    try {
+      if (condition.test(node.getLastHeartbeatTime())) {
+        NodeState newState = stateMachine.getNextState(state, lifeCycleEvent);
+        nodeStateMap.updateNodeState(node.getUuid(), state, newState);
+      }
+    } catch (InvalidStateTransitionException e) {
+      LOG.warn("Invalid state transition of node {}." +
+              " Current state: {}, life cycle event: {}",
+          node, state, lifeCycleEvent);
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown NodeStateManager properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index adca8ea..15ac3f2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
@@ -50,8 +49,6 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,39 +60,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .INVALID;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static org.apache.hadoop.util.Time.monotonicNow;
 
 /**
  * Maintains information about the Datanodes on SCM side.
  * <p>
  * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
  * <p>
- * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
- * staleNodesMap to deadNodesMap. This moving of a node from one map to another
- * is controlled by 4 configuration variables. These variables define how many
- * heartbeats must go missing for the node to move from one map to another.
- * <p>
- * Each heartbeat that SCMNodeManager receives is  put into heartbeatQueue. The
- * worker thread wakes up and grabs that heartbeat from the queue. The worker
- * thread will lookup the healthynodes map and set the timestamp if the entry
- * is there. if not it will look up stale and deadnodes map.
- * <p>
  * The getNode(byState) functions make copy of node maps and then creates a list
  * based on that. It should be assumed that these get functions always report
  * *stale* information. For example, getting the deadNodeCount followed by
@@ -113,33 +86,18 @@ public class SCMNodeManager
   static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeManager.class);
 
-  /**
-   * Key = NodeID, value = timestamp.
-   */
-  private final ConcurrentHashMap<UUID, Long> healthyNodes;
-  private final ConcurrentHashMap<UUID, Long> staleNodes;
-  private final ConcurrentHashMap<UUID, Long> deadNodes;
-  private final Queue<HeartbeatQueueItem> heartbeatQueue;
-  private final ConcurrentHashMap<UUID, DatanodeDetails> nodes;
+
+  private final NodeStateManager nodeStateManager;
   // Individual live node stats
+  // TODO: NodeStat should be moved to NodeStatemanager (NodeStateMap)
   private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
+  // Should we maintain aggregated stats? If this is not frequently used, we
+  // can always calculate it from nodeStats whenever required.
   // Aggregated node stats
   private SCMNodeStat scmStat;
-  // TODO: expose nodeStats and scmStat as metrics
-  private final AtomicInteger healthyNodeCount;
-  private final AtomicInteger staleNodeCount;
-  private final AtomicInteger deadNodeCount;
-  private final AtomicInteger totalNodes;
-  private long staleNodeIntervalMs;
-  private final long deadNodeIntervalMs;
-  private final long heartbeatCheckerIntervalMs;
-  private final long datanodeHBIntervalSeconds;
-  private final ScheduledExecutorService executorService;
-  private long lastHBcheckStart;
-  private long lastHBcheckFinished = 0;
-  private long lastHBProcessedCount;
+  // Should we create ChillModeManager and extract all the chill mode logic
+  // to a new class?
   private int chillModeNodeCount;
-  private final int maxHBToProcessPerLoop;
   private final String clusterID;
   private final VersionInfo version;
   /**
@@ -168,47 +126,19 @@ public class SCMNodeManager
    */
   public SCMNodeManager(OzoneConfiguration conf, String clusterID,
       StorageContainerManager scmManager) throws IOException {
-    heartbeatQueue = new ConcurrentLinkedQueue<>();
-    healthyNodes = new ConcurrentHashMap<>();
-    deadNodes = new ConcurrentHashMap<>();
-    staleNodes = new ConcurrentHashMap<>();
-    nodes = new ConcurrentHashMap<>();
-    nodeStats = new ConcurrentHashMap<>();
-    scmStat = new SCMNodeStat();
-
-    healthyNodeCount = new AtomicInteger(0);
-    staleNodeCount = new AtomicInteger(0);
-    deadNodeCount = new AtomicInteger(0);
-    totalNodes = new AtomicInteger(0);
+    this.nodeStateManager = new NodeStateManager(conf);
+    this.nodeStats = new ConcurrentHashMap<>();
+    this.scmStat = new SCMNodeStat();
     this.clusterID = clusterID;
     this.version = VersionInfo.getLatestVersion();
-    commandQueue = new CommandQueue();
-
+    this.commandQueue = new CommandQueue();
     // TODO: Support this value as a Percentage of known machines.
-    chillModeNodeCount = 1;
-
-    staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
-    deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf);
-    heartbeatCheckerIntervalMs =
-        HddsServerUtil.getScmheartbeatCheckerInterval(conf);
-    datanodeHBIntervalSeconds = HddsServerUtil.getScmHeartbeatInterval(conf);
-    maxHBToProcessPerLoop = HddsServerUtil.getMaxHBToProcessPerLoop(conf);
-
-    executorService = HadoopExecutors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
-
-    LOG.info("Entering startup chill mode.");
+    this.chillModeNodeCount = 1;
     this.inStartupChillMode = new AtomicBoolean(true);
     this.inManualChillMode = new AtomicBoolean(false);
-
-    Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
-    executorService.schedule(this, heartbeatCheckerIntervalMs,
-        TimeUnit.MILLISECONDS);
-
-    registerMXBean();
-
     this.scmManager = scmManager;
+    LOG.info("Entering startup chill mode.");
+    registerMXBean();
   }
 
   private void registerMXBean() {
@@ -227,12 +157,11 @@ public class SCMNodeManager
    * Removes a data node from the management of this Node Manager.
    *
    * @param node - DataNode.
-   * @throws UnregisteredNodeException
+   * @throws NodeNotFoundException
    */
   @Override
-  public void removeNode(DatanodeDetails node) {
-    // TODO : Fix me when adding the SCM CLI.
-
+  public void removeNode(DatanodeDetails node) throws NodeNotFoundException {
+    nodeStateManager.removeNode(node);
   }
 
   /**
@@ -244,31 +173,8 @@ public class SCMNodeManager
    * @return List of Datanodes that are known to SCM in the requested state.
    */
   @Override
-  public List<DatanodeDetails> getNodes(NodeState nodestate)
-      throws IllegalArgumentException {
-    Map<UUID, Long> set;
-    switch (nodestate) {
-    case HEALTHY:
-      synchronized (this) {
-        set = Collections.unmodifiableMap(new HashMap<>(healthyNodes));
-      }
-      break;
-    case STALE:
-      synchronized (this) {
-        set = Collections.unmodifiableMap(new HashMap<>(staleNodes));
-      }
-      break;
-    case DEAD:
-      synchronized (this) {
-        set = Collections.unmodifiableMap(new HashMap<>(deadNodes));
-      }
-      break;
-    default:
-      throw new IllegalArgumentException("Unknown node state requested.");
-    }
-
-    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
-        .collect(Collectors.toList());
+  public List<DatanodeDetails> getNodes(NodeState nodestate) {
+    return nodeStateManager.getNodes(nodestate);
   }
 
   /**
@@ -278,12 +184,7 @@ public class SCMNodeManager
    */
   @Override
   public List<DatanodeDetails> getAllNodes() {
-    Map<UUID, DatanodeDetails> set;
-    synchronized (this) {
-      set = Collections.unmodifiableMap(new HashMap<>(nodes));
-    }
-    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
-        .collect(Collectors.toList());
+    return nodeStateManager.getAllNodes();
   }
 
   /**
@@ -315,14 +216,16 @@ public class SCMNodeManager
     if (inStartupChillMode.get()) {
       return "Still in chill mode, waiting on nodes to report in." +
           String.format(" %d nodes reported, minimal %d nodes required.",
-              totalNodes.get(), getMinimumChillModeNodes());
+              nodeStateManager.getTotalNodeCount(), getMinimumChillModeNodes());
     }
     if (inManualChillMode.get()) {
       return "Out of startup chill mode, but in manual chill mode." +
-          String.format(" %d nodes have reported in.", totalNodes.get());
+          String.format(" %d nodes have reported in.",
+              nodeStateManager.getTotalNodeCount());
     }
     return "Out of chill mode." +
-        String.format(" %d nodes have reported in.", totalNodes.get());
+        String.format(" %d nodes have reported in.",
+            nodeStateManager.getTotalNodeCount());
   }
 
   /**
@@ -376,33 +279,7 @@ public class SCMNodeManager
    */
   @Override
   public int getNodeCount(NodeState nodestate) {
-    switch (nodestate) {
-    case HEALTHY:
-      return healthyNodeCount.get();
-    case STALE:
-      return staleNodeCount.get();
-    case DEAD:
-      return deadNodeCount.get();
-    case INVALID:
-      // This is unknown due to the fact that some nodes can be in
-      // transit between the other states. Returning a count for that is not
-      // possible. The fact that we have such state is to deal with the fact
-      // that this information might not be consistent always.
-      return 0;
-    default:
-      return 0;
-    }
-  }
-
-  /**
-   * Used for testing.
-   *
-   * @return true if the HB check is done.
-   */
-  @VisibleForTesting
-  @Override
-  public boolean waitForHeartbeatProcessed() {
-    return lastHBcheckFinished != 0;
+    return nodeStateManager.getNodeCount(nodestate);
   }
 
   /**
@@ -413,236 +290,14 @@ public class SCMNodeManager
    */
   @Override
   public NodeState getNodeState(DatanodeDetails datanodeDetails) {
-    // There is a subtle race condition here, hence we also support
-    // the NODEState.UNKNOWN. It is possible that just before we check the
-    // healthyNodes, we have removed the node from the healthy list but stil
-    // not added it to Stale Nodes list.
-    // We can fix that by adding the node to stale list before we remove, but
-    // then the node is in 2 states to avoid this race condition. Instead we
-    // just deal with the possibilty of getting a state called unknown.
-
-    UUID id = datanodeDetails.getUuid();
-    if(healthyNodes.containsKey(id)) {
-      return HEALTHY;
-    }
-
-    if(staleNodes.containsKey(id)) {
-      return STALE;
-    }
-
-    if(deadNodes.containsKey(id)) {
-      return DEAD;
-    }
-
-    return INVALID;
-  }
-
-  /**
-   * This is the real worker thread that processes the HB queue. We do the
-   * following things in this thread.
-   * <p>
-   * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node to
-   * healthy if we got a heartbeat from them. Move Stales Node to dead node
-   * table if it is needed. Move healthy nodes to stale nodes if it is needed.
-   * <p>
-   * if it is a new node, we call register node and add it to the list of nodes.
-   * This will be replaced when we support registration of a node in SCM.
-   *
-   * @see Thread#run()
-   */
-  @Override
-  public void run() {
-    lastHBcheckStart = monotonicNow();
-    lastHBProcessedCount = 0;
-
-    // Process the whole queue.
-    while (!heartbeatQueue.isEmpty() &&
-        (lastHBProcessedCount < maxHBToProcessPerLoop)) {
-      HeartbeatQueueItem hbItem = heartbeatQueue.poll();
-      synchronized (this) {
-        handleHeartbeat(hbItem);
-      }
-      // we are shutting down or something give up processing the rest of
-      // HBs. This will terminate the HB processing thread.
-      if (Thread.currentThread().isInterrupted()) {
-        LOG.info("Current Thread is isInterrupted, shutting down HB " +
-            "processing thread for Node Manager.");
-        return;
-      }
-    }
-
-    if (lastHBProcessedCount >= maxHBToProcessPerLoop) {
-      LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" +
-          " the heartbeat counts. Processed {} heartbeats. Breaking out of" +
-          " loop. Leaving rest to be processed later. ", lastHBProcessedCount);
-    }
-
-    // Iterate over the Stale nodes and decide if we need to move any node to
-    // dead State.
-    long currentTime = monotonicNow();
-    for (Map.Entry<UUID, Long> entry : staleNodes.entrySet()) {
-      if (currentTime - entry.getValue() > deadNodeIntervalMs) {
-        synchronized (this) {
-          moveStaleNodeToDead(entry);
-        }
-      }
-    }
-
-    // Iterate over the healthy nodes and decide if we need to move any node to
-    // Stale State.
-    currentTime = monotonicNow();
-    for (Map.Entry<UUID, Long> entry : healthyNodes.entrySet()) {
-      if (currentTime - entry.getValue() > staleNodeIntervalMs) {
-        synchronized (this) {
-          moveHealthyNodeToStale(entry);
-        }
-      }
-    }
-    lastHBcheckFinished = monotonicNow();
-
-    monitorHBProcessingTime();
-
-    // we purposefully make this non-deterministic. Instead of using a
-    // scheduleAtFixedFrequency  we will just go to sleep
-    // and wake up at the next rendezvous point, which is currentTime +
-    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
-    // heart beating not at a fixed cadence, but clock tick + time taken to
-    // work.
-    //
-    // This time taken to work can skew the heartbeat processor thread.
-    // The reason why we don't care is because of the following reasons.
-    //
-    // 1. checkerInterval is general many magnitudes faster than datanode HB
-    // frequency.
-    //
-    // 2. if we have too much nodes, the SCM would be doing only HB
-    // processing, this could lead to SCM's CPU starvation. With this
-    // approach we always guarantee that  HB thread sleeps for a little while.
-    //
-    // 3. It is possible that we will never finish processing the HB's in the
-    // thread. But that means we have a mis-configured system. We will warn
-    // the users by logging that information.
-    //
-    // 4. And the most important reason, heartbeats are not blocked even if
-    // this thread does not run, they will go into the processing queue.
-
-    if (!Thread.currentThread().isInterrupted() &&
-        !executorService.isShutdown()) {
-      executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
-          .MILLISECONDS);
-    } else {
-      LOG.info("Current Thread is interrupted, shutting down HB processing " +
-          "thread for Node Manager.");
-    }
-  }
-
-  /**
-   * If we have taken too much time for HB processing, log that information.
-   */
-  private void monitorHBProcessingTime() {
-    if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished -
-        lastHBcheckStart) > datanodeHBIntervalSeconds) {
-      LOG.error("Total time spend processing datanode HB's is greater than " +
-              "configured values for datanode heartbeats. Please adjust the" +
-              " heartbeat configs. Time Spend on HB processing: {} seconds " +
-              "Datanode heartbeat Interval: {} seconds , heartbeats " +
-              "processed: {}",
-          TimeUnit.MILLISECONDS
-              .toSeconds(lastHBcheckFinished - lastHBcheckStart),
-          datanodeHBIntervalSeconds, lastHBProcessedCount);
-    }
-  }
-
-  /**
-   * Moves a Healthy node to a Stale node state.
-   *
-   * @param entry - Map Entry
-   */
-  private void moveHealthyNodeToStale(Map.Entry<UUID, Long> entry) {
-    LOG.trace("Moving healthy node to stale: {}", entry.getKey());
-    healthyNodes.remove(entry.getKey());
-    healthyNodeCount.decrementAndGet();
-    staleNodes.put(entry.getKey(), entry.getValue());
-    staleNodeCount.incrementAndGet();
-
-    if (scmManager != null) {
-      // remove stale node's container report
-      scmManager.removeContainerReport(entry.getKey().toString());
+    try {
+      return nodeStateManager.getNodeState(datanodeDetails);
+    } catch (NodeNotFoundException e) {
+      // TODO: should we throw NodeNotFoundException?
+      return null;
     }
   }
 
-  /**
-   * Moves a Stale node to a dead node state.
-   *
-   * @param entry - Map Entry
-   */
-  private void moveStaleNodeToDead(Map.Entry<UUID, Long> entry) {
-    LOG.trace("Moving stale node to dead: {}", entry.getKey());
-    staleNodes.remove(entry.getKey());
-    staleNodeCount.decrementAndGet();
-    deadNodes.put(entry.getKey(), entry.getValue());
-    deadNodeCount.incrementAndGet();
-
-    // Update SCM node stats
-    SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey());
-    scmStat.subtract(deadNodeStat);
-    nodeStats.remove(entry.getKey());
-  }
-
-  /**
-   * Handles a single heartbeat from a datanode.
-   *
-   * @param hbItem - heartbeat item from a datanode.
-   */
-  private void handleHeartbeat(HeartbeatQueueItem hbItem) {
-    lastHBProcessedCount++;
-
-    DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails();
-    UUID datanodeUuid = datanodeDetails.getUuid();
-    NodeReportProto nodeReport = hbItem.getNodeReport();
-    long recvTimestamp = hbItem.getRecvTimestamp();
-    long processTimestamp = Time.monotonicNow();
-    if (LOG.isTraceEnabled()) {
-      //TODO: add average queue time of heartbeat request as metrics
-      LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
-          datanodeUuid, processTimestamp - recvTimestamp);
-    }
-
-    // If this node is already in the list of known and healthy nodes
-    // just set the last timestamp and return.
-    if (healthyNodes.containsKey(datanodeUuid)) {
-      healthyNodes.put(datanodeUuid, processTimestamp);
-      updateNodeStat(datanodeUuid, nodeReport);
-      return;
-    }
-
-    // A stale node has heartbeat us we need to remove the node from stale
-    // list and move to healthy list.
-    if (staleNodes.containsKey(datanodeUuid)) {
-      staleNodes.remove(datanodeUuid);
-      healthyNodes.put(datanodeUuid, processTimestamp);
-      healthyNodeCount.incrementAndGet();
-      staleNodeCount.decrementAndGet();
-      updateNodeStat(datanodeUuid, nodeReport);
-      return;
-    }
-
-    // A dead node has heartbeat us, we need to remove that node from dead
-    // node list and move it to the healthy list.
-    if (deadNodes.containsKey(datanodeUuid)) {
-      deadNodes.remove(datanodeUuid);
-      healthyNodes.put(datanodeUuid, processTimestamp);
-      deadNodeCount.decrementAndGet();
-      healthyNodeCount.incrementAndGet();
-      updateNodeStat(datanodeUuid, nodeReport);
-      return;
-    }
-
-    LOG.warn("SCM receive heartbeat from unregistered datanode {}",
-        datanodeUuid);
-    this.commandQueue.addCommand(datanodeUuid,
-        new ReregisterCommand());
-  }
 
   private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
     SCMNodeStat stat = nodeStats.get(dnId);
@@ -679,24 +334,6 @@ public class SCMNodeManager
   @Override
   public void close() throws IOException {
     unregisterMXBean();
-    executorService.shutdown();
-    try {
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        executorService.shutdownNow();
-      }
-
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOG.error("Unable to shutdown NodeManager properly.");
-      }
-    } catch (InterruptedException e) {
-      executorService.shutdownNow();
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @VisibleForTesting
-  long getLastHBProcessedCount() {
-    return lastHBProcessedCount;
   }
 
   /**
@@ -739,27 +376,22 @@ public class SCMNodeManager
       datanodeDetails.setHostName(hostname);
       datanodeDetails.setIpAddress(ip);
     }
-    RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
-    if (responseCommand != null) {
-      return responseCommand;
-    }
     UUID dnId = datanodeDetails.getUuid();
-    nodes.put(dnId, datanodeDetails);
-    totalNodes.incrementAndGet();
-    healthyNodes.put(dnId, monotonicNow());
-    healthyNodeCount.incrementAndGet();
-    nodeStats.put(dnId, new SCMNodeStat());
-
-    if(inStartupChillMode.get() &&
-        totalNodes.get() >= getMinimumChillModeNodes()) {
-      inStartupChillMode.getAndSet(false);
-      LOG.info("Leaving startup chill mode.");
+    try {
+      nodeStateManager.addNode(datanodeDetails);
+      nodeStats.put(dnId, new SCMNodeStat());
+      if(inStartupChillMode.get() &&
+          nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) {
+        inStartupChillMode.getAndSet(false);
+        LOG.info("Leaving startup chill mode.");
+      }
+      // Updating Node Report, as registration is successful
+      updateNodeStat(datanodeDetails.getUuid(), nodeReport);
+      LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid());
+    } catch (NodeAlreadyExistsException e) {
+      LOG.trace("Datanode is already registered. Datanode: {}",
+          datanodeDetails.toString());
     }
-
-    // Updating Node Report, as registration is successful
-    updateNodeStat(datanodeDetails.getUuid(), nodeReport);
-    LOG.info("Data node with ID: {} Registered.",
-        datanodeDetails.getUuid());
     RegisteredCommand.Builder builder =
         RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
             .setDatanodeUUID(datanodeDetails.getUuidString())
@@ -771,45 +403,24 @@ public class SCMNodeManager
   }
 
   /**
-   * Verifies the datanode does not have a valid UUID already.
-   *
-   * @param datanodeDetails - Datanode Details.
-   * @return SCMCommand
-   */
-  private RegisteredCommand verifyDatanodeUUID(
-      DatanodeDetails datanodeDetails) {
-    if (datanodeDetails.getUuid() != null &&
-        nodes.containsKey(datanodeDetails.getUuid())) {
-      LOG.trace("Datanode is already registered. Datanode: {}",
-          datanodeDetails.toString());
-      return RegisteredCommand.newBuilder()
-          .setErrorCode(ErrorCode.success)
-          .setClusterID(this.clusterID)
-          .setDatanodeUUID(datanodeDetails.getUuidString())
-          .build();
-    }
-    return null;
-  }
-
-  /**
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeDetails - DatanodeDetailsProto.
-   * @param nodeReport - node report.
    * @return SCMheartbeat response.
    * @throws IOException
    */
   @Override
-  public List<SCMCommand> sendHeartbeat(
-      DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
-
+  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
     Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
         "DatanodeDetails.");
-    heartbeatQueue.add(
-        new HeartbeatQueueItem.Builder()
-            .setDatanodeDetails(datanodeDetails)
-            .setNodeReport(nodeReport)
-            .build());
+    try {
+      nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
+    } catch (NodeNotFoundException e) {
+      LOG.warn("SCM receive heartbeat from unregistered datanode {}",
+          datanodeDetails);
+      commandQueue.addCommand(datanodeDetails.getUuid(),
+          new ReregisterCommand());
+    }
     return commandQueue.getCommand(datanodeDetails.getUuid());
   }
 
@@ -855,11 +466,6 @@ public class SCMNodeManager
     this.commandQueue.addCommand(dnId, command);
   }
 
-  @VisibleForTesting
-  public void setStaleNodeIntervalMs(long interval) {
-    this.staleNodeIntervalMs = interval;
-  }
-
   @Override
   public void onMessage(CommandForDatanode commandForDatanode,
       EventPublisher publisher) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
new file mode 100644
index 0000000..aa5c382
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+/**
+ * This exception represents that there is already a node added to NodeStateMap
+ * with same UUID.
+ */
+public class NodeAlreadyExistsException extends NodeException {
+
+  /**
+   * Constructs an {@code NodeAlreadyExistsException} with {@code null}
+   * as its error detail message.
+   */
+  public NodeAlreadyExistsException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code NodeAlreadyExistsException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public NodeAlreadyExistsException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
new file mode 100644
index 0000000..c67b55d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+/**
+ * This exception represents all node related exceptions in NodeStateMap.
+ */
+public class NodeException extends Exception {
+
+  /**
+   * Constructs an {@code NodeException} with {@code null}
+   * as its error detail message.
+   */
+  public NodeException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code NodeException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public NodeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
new file mode 100644
index 0000000..52a352e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import java.io.IOException;
+
+/**
+ * This exception represents that the node that is being accessed does not
+ * exist in NodeStateMap.
+ */
+public class NodeNotFoundException extends NodeException {
+
+
+  /**
+   * Constructs an {@code NodeNotFoundException} with {@code null}
+   * as its error detail message.
+   */
+  public NodeNotFoundException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code NodeNotFoundException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public NodeNotFoundException(String message) {
+    super(message);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message