hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject [10/45] hadoop git commit: Revert "HDDS-194. Remove NodePoolManager and node pool handling from SCM. Contributed by Elek Marton"
Date Mon, 02 Jul 2018 20:32:27 GMT
Revert "HDDS-194. Remove NodePoolManager and node pool handling from SCM. Contributed by Elek Marton"

This reverts commit aaf03cc459a34af284f9735453aefd4ddb430d67.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d6fe5f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d6fe5f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d6fe5f3

Branch: refs/heads/HDDS-4
Commit: 0d6fe5f36be5b19aab89d995866e526c5feec758
Parents: aaf03cc
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Wed Jun 27 13:25:45 2018 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Wed Jun 27 13:25:45 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |  26 ++
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   1 +
 .../common/src/main/resources/ozone-default.xml |  47 +++
 .../container/replication/ReplicationQueue.java |  78 -----
 .../replication/ReplicationReqMsg.java          | 107 ------
 .../container/replication/package-info.java     |  23 --
 .../replication/TestReplicationQueue.java       | 134 --------
 .../container/replication/package-info.java     |  23 --
 .../hdds/scm/container/ContainerMapping.java    |  10 +-
 .../replication/ContainerSupervisor.java        | 340 +++++++++++++++++++
 .../container/replication/InProgressPool.java   | 255 ++++++++++++++
 .../scm/container/replication/PeriodicPool.java | 119 +++++++
 .../scm/container/replication/package-info.java |  23 ++
 .../hadoop/hdds/scm/node/NodeManager.java       |   6 +
 .../hadoop/hdds/scm/node/NodePoolManager.java   |  71 ++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java    |  23 ++
 .../hdds/scm/node/SCMNodePoolManager.java       | 269 +++++++++++++++
 .../hdds/scm/container/MockNodeManager.java     |   6 +
 .../hdds/scm/node/TestSCMNodePoolManager.java   | 160 +++++++++
 .../testutils/ReplicationNodeManagerMock.java   |   5 +
 .../ReplicationNodePoolManagerMock.java         | 133 ++++++++
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |  31 ++
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |  74 ++++
 23 files changed, 1596 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index df6fbf0..85407e6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -243,6 +243,32 @@ public final class ScmConfigKeys {
   public static final String
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
+  /**
+   * Don't start processing a pool if we have not had a minimum number of
+   * seconds from the last processing.
+   */
+  public static final String OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL =
+      "ozone.scm.container.report.processing.interval";
+  public static final String
+      OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s";
+
+  /**
+   * This determines the total number of pools to be processed in parallel.
+   */
+  public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS =
+      "ozone.scm.max.nodepool.processing.threads";
+  public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1;
+  /**
+   * These 2 settings control the number of threads in executor pool and time
+   * outs for thw container reports from all nodes.
+   */
+  public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS =
+      "ozone.scm.max.container.report.threads";
+  public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100;
+  public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT =
+      "ozone.scm.container.reports.wait.timeout";
+  public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
+      "5m";
 
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
       "ozone.scm.block.deletion.max.retry";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 08a5ffd..c40dc8e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -91,6 +91,7 @@ public final class OzoneConsts {
   public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
   public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
   public static final String BLOCK_DB = "block.db";
+  public static final String NODEPOOL_DB = "nodepool.db";
   public static final String OPEN_CONTAINERS_DB = "openContainers.db";
   public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String KSM_DB_NAME = "ksm.db";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 25365c8..7a91610 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -572,6 +572,25 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.container.report.processing.interval</name>
+    <value>60s</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>Time interval for scm to process container reports
+      for a node pool. Scm handles node pool reports in a cyclic clock
+      manner, it fetches pools periodically with this time interval.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.reports.wait.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
+    <description>Maximum time to wait in seconds for processing all container
+      reports from
+      a node pool. It determines the timeout for a
+      node pool report.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.container.size.gb</name>
     <value>5</value>
     <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
@@ -774,6 +793,17 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.max.container.report.threads</name>
+    <value>100</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Maximum number of threads to process container reports in scm.
+      Each container report from a data node is processed by scm in a worker
+      thread, fetched from a thread pool. This property is used to control the
+      maximum size of the thread pool.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.max.hb.count.to.process</name>
     <value>5000</value>
     <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
@@ -785,6 +815,14 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.max.nodepool.processing.threads</name>
+    <value>1</value>
+    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
+    <description>
+      Number of node pools to process in parallel.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.names</name>
     <value/>
     <tag>OZONE</tag>
@@ -806,6 +844,15 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.max.nodepool.processing.threads</name>
+    <value>1</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      Controls the number of node pools that can be processed in parallel by
+      Container Supervisor.
+    </description>
+  </property>
+  <property>
     <name>ozone.trace.enabled</name>
     <value>false</value>
     <tag>OZONE, DEBUG</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
deleted file mode 100644
index b83ecf1..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-/**
- * Priority queue to handle under-replicated and over replicated containers
- * in ozone. ReplicationManager will consume these messages and decide
- * accordingly.
- */
-public class ReplicationQueue {
-
-  private final Queue<ReplicationReqMsg> queue;
-
-  ReplicationQueue() {
-    queue = new PriorityQueue<>();
-  }
-
-  public synchronized boolean add(ReplicationReqMsg repObj) {
-    if (this.queue.contains(repObj)) {
-      // Remove the earlier message and insert this one
-      this.queue.remove(repObj);
-      return this.queue.add(repObj);
-    } else {
-      return this.queue.add(repObj);
-    }
-  }
-
-  public synchronized boolean remove(ReplicationReqMsg repObj) {
-    return queue.remove(repObj);
-  }
-
-  /**
-   * Retrieves, but does not remove, the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
-   */
-  public synchronized ReplicationReqMsg peek() {
-    return queue.peek();
-  }
-
-  /**
-   * Retrieves and removes the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
-   */
-  public synchronized ReplicationReqMsg poll() {
-    return queue.poll();
-  }
-
-  public synchronized boolean removeAll(List<ReplicationReqMsg> repObjs) {
-    return queue.removeAll(repObjs);
-  }
-
-  public int size() {
-    return queue.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java
deleted file mode 100644
index 8d26fc3..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.Serializable;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.math.NumberUtils;
-
-/**
- * Wrapper class for hdds replication queue. Implements its natural
- * ordering for priority queue.
- */
-public class ReplicationReqMsg implements Comparable<ReplicationReqMsg>,
-    Serializable {
-  private final long containerId;
-  private final short replicationCount;
-  private final short expecReplicationCount;
-  private final long timestamp;
-
-  public ReplicationReqMsg(long containerId, short replicationCount,
-      long timestamp, short expecReplicationCount) {
-    this.containerId = containerId;
-    this.replicationCount = replicationCount;
-    this.timestamp = timestamp;
-    this.expecReplicationCount = expecReplicationCount;
-  }
-
-  /**
-   * Compares this object with the specified object for order.  Returns a
-   * negative integer, zero, or a positive integer as this object is less
-   * than, equal to, or greater than the specified object.
-   * @param o the object to be compared.
-   * @return a negative integer, zero, or a positive integer as this object
-   * is less than, equal to, or greater than the specified object.
-   * @throws NullPointerException if the specified object is null
-   * @throws ClassCastException   if the specified object's type prevents it
-   *                              from being compared to this object.
-   */
-  @Override
-  public int compareTo(ReplicationReqMsg o) {
-    if (this == o) {
-      return 0;
-    }
-    if (o == null) {
-      return 1;
-    }
-    int retVal = NumberUtils
-        .compare(getReplicationCount() - getExpecReplicationCount(),
-            o.getReplicationCount() - o.getExpecReplicationCount());
-    if (retVal != 0) {
-      return retVal;
-    }
-    return NumberUtils.compare(getTimestamp(), o.getTimestamp());
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(91, 1011)
-        .append(getContainerId())
-        .toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ReplicationReqMsg that = (ReplicationReqMsg) o;
-    return new EqualsBuilder().append(getContainerId(), that.getContainerId())
-        .isEquals();
-  }
-
-  public long getContainerId() {
-    return containerId;
-  }
-
-  public short getReplicationCount() {
-    return replicationCount;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  public short getExpecReplicationCount() {
-    return expecReplicationCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 7f335e3..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-/**
- * Ozone Container replicaton related classes.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
deleted file mode 100644
index 39c61d3..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.Random;
-import java.util.UUID;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for ReplicationQueue.
- */
-public class TestReplicationQueue {
-
-  private ReplicationQueue replicationQueue;
-  private Random random;
-
-  @Before
-  public void setUp() {
-    replicationQueue = new ReplicationQueue();
-    random = new Random();
-  }
-
-  @Test
-  public void testDuplicateAddOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationReqMsg obj1, obj2, obj3;
-    long time = Time.monotonicNow();
-    obj1 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3);
-    obj2 = new ReplicationReqMsg(contId, (short) 2, time + 1, (short) 3);
-    obj3 = new ReplicationReqMsg(contId, (short) 1, time+2, (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should add only 1 msg as second one is duplicate",
-        1, replicationQueue.size());
-    ReplicationReqMsg temp = replicationQueue.poll();
-    Assert.assertEquals(temp, obj3);
-  }
-
-  @Test
-  public void testPollOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationReqMsg msg1, msg2, msg3, msg4, msg5;
-    msg1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    long time = Time.monotonicNow();
-    msg2 = new ReplicationReqMsg(contId + 1, (short) 4, time, (short) 3);
-    msg3 = new ReplicationReqMsg(contId + 2, (short) 0, time, (short) 3);
-    msg4 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3);
-    // Replication message for same container but different nodeId
-    msg5 = new ReplicationReqMsg(contId + 1, (short) 2, time, (short) 3);
-
-    replicationQueue.add(msg1);
-    replicationQueue.add(msg2);
-    replicationQueue.add(msg3);
-    replicationQueue.add(msg4);
-    replicationQueue.add(msg5);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    // Since Priority queue orders messages according to replication count,
-    // message with lowest replication should be first
-    ReplicationReqMsg temp;
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-    Assert.assertEquals(temp, msg3);
-
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-    Assert.assertEquals(temp, msg5);
-
-    // Message 2 should be ordered before message 5 as both have same replication
-    // number but message 2 has earlier timestamp.
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 0 objects",
-        replicationQueue.size(), 0);
-    Assert.assertEquals(temp, msg4);
-  }
-
-  @Test
-  public void testRemoveOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationReqMsg obj1, obj2, obj3;
-    obj1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    obj2 = new ReplicationReqMsg(contId + 1, (short) 2, Time.monotonicNow(),
-        (short) 3);
-    obj3 = new ReplicationReqMsg(contId + 2, (short) 3, Time.monotonicNow(),
-        (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    replicationQueue.remove(obj3);
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-
-    replicationQueue.remove(obj2);
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-
-    replicationQueue.remove(obj1);
-    Assert.assertEquals("Should have 0 objects",
-        0, replicationQueue.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 5b1fd0f..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-/**
- * SCM Testing and Mocking Utils.
- */
-package org.apache.hadoop.ozone.container.replication;
-// Test classes for Replication functionality.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 9fd30f2..b563e90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -79,6 +80,7 @@ public class ContainerMapping implements Mapping {
   private final PipelineSelector pipelineSelector;
   private final ContainerStateManager containerStateManager;
   private final LeaseManager<ContainerInfo> containerLeaseManager;
+  private final ContainerSupervisor containerSupervisor;
   private final float containerCloseThreshold;
   private final ContainerCloser closer;
   private final long size;
@@ -125,7 +127,9 @@ public class ContainerMapping implements Mapping {
         OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
     this.containerStateManager =
         new ContainerStateManager(conf, this);
-
+    this.containerSupervisor =
+        new ContainerSupervisor(conf, nodeManager,
+            nodeManager.getNodePoolManager());
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -403,8 +407,8 @@ public class ContainerMapping implements Mapping {
       throws IOException {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
-
-     for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
+    containerSupervisor.handleContainerReport(datanodeDetails, reports);
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
       byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
       lock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
new file mode 100644
index 0000000..5bd0574
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerSupervisor implements Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerSupervisor.class);
+
+  private final NodePoolManager poolManager;
+  private final HashSet<String> poolNames;
+  private final PriorityQueue<PeriodicPool> poolQueue;
+  private final NodeManager nodeManager;
+  private final long containerProcessingLag;
+  private final AtomicBoolean runnable;
+  private final ExecutorService executorService;
+  private final long maxPoolWait;
+  private long poolProcessCount;
+  private final List<InProgressPool> inProgressPoolList;
+  private final AtomicInteger threadFaultCount;
+  private final int inProgressPoolMaxCount;
+
+  private final ReadWriteLock inProgressPoolListLock;
+
+  /**
+   * Returns the number of times we have processed pools.
+   * @return long
+   */
+  public long getPoolProcessCount() {
+    return poolProcessCount;
+  }
+
+
+  /**
+   * Constructs a class that computes Replication Levels.
+   *
+   * @param conf - OzoneConfiguration
+   * @param nodeManager - Node Manager
+   * @param poolManager - Pool Manager
+   */
+  public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
+                             NodePoolManager poolManager) {
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(nodeManager);
+    this.containerProcessingLag =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
+            OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT,
+            TimeUnit.SECONDS
+        ) * 1000;
+    int maxContainerReportThreads =
+        conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+            OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+        );
+    this.maxPoolWait =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
+            OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    this.inProgressPoolMaxCount = conf.getInt(
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
+    this.poolManager = poolManager;
+    this.nodeManager = nodeManager;
+    this.poolNames = new HashSet<>();
+    this.poolQueue = new PriorityQueue<>();
+    this.runnable = new AtomicBoolean(true);
+    this.threadFaultCount = new AtomicInteger(0);
+    this.executorService = newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Container Reports Processing Thread - %d")
+            .build(), maxContainerReportThreads);
+    this.inProgressPoolList = new LinkedList<>();
+    this.inProgressPoolListLock = new ReentrantReadWriteLock();
+
+    initPoolProcessThread();
+  }
+
+  private ExecutorService newCachedThreadPool(ThreadFactory threadFactory,
+      int maxThreads) {
+    return new HadoopThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), threadFactory);
+  }
+
+  /**
+   * Returns the number of pools that are under process right now.
+   * @return  int - Number of pools that are in process.
+   */
+  public int getInProgressPoolCount() {
+    return inProgressPoolList.size();
+  }
+
+  /**
+   * Exits the background thread.
+   */
+  public void setExit() {
+    this.runnable.set(false);
+  }
+
+  /**
+   * Adds or removes pools from names that we need to process.
+   *
+   * There are two different cases that we need to process.
+   * The case where some pools are being added and some times we have to
+   * handle cases where pools are removed.
+   */
+  private void refreshPools() {
+    List<String> pools = this.poolManager.getNodePools();
+    if (pools != null) {
+
+      HashSet<String> removedPools =
+          computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+      HashSet<String> addedPools =
+          computePoolDifference(new HashSet<>(pools), this.poolNames);
+      // TODO: Support remove pool API in pool manager so that this code
+      // path can be tested. This never happens in the current code base.
+      for (String poolName : removedPools) {
+        for (PeriodicPool periodicPool : poolQueue) {
+          if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+            poolQueue.remove(periodicPool);
+          }
+        }
+      }
+      // Remove the pool names that we have in the list.
+      this.poolNames.removeAll(removedPools);
+
+      for (String poolName : addedPools) {
+        poolQueue.add(new PeriodicPool(poolName));
+      }
+
+      // Add to the pool names we are tracking.
+      poolNames.addAll(addedPools);
+    }
+
+  }
+
+  /**
+   * Handle the case where pools are added.
+   *
+   * @param newPools - New Pools list
+   * @param oldPool - oldPool List.
+   */
+  private HashSet<String> computePoolDifference(HashSet<String> newPools,
+      Set<String> oldPool) {
+    Preconditions.checkNotNull(newPools);
+    Preconditions.checkNotNull(oldPool);
+    HashSet<String> newSet = new HashSet<>(newPools);
+    newSet.removeAll(oldPool);
+    return newSet;
+  }
+
+  private void initPoolProcessThread() {
+
+    /*
+     * Task that runs to check if we need to start a pool processing job.
+     * if so we create a pool reconciliation job and find out of all the
+     * expected containers are on the nodes.
+     */
+    Runnable processPools = () -> {
+      while (runnable.get()) {
+        // Make sure that we don't have any new pools.
+        refreshPools();
+        while (inProgressPoolList.size() < inProgressPoolMaxCount) {
+          PeriodicPool pool = poolQueue.poll();
+          if (pool != null) {
+            if (pool.getLastProcessedTime() + this.containerProcessingLag >
+                Time.monotonicNow()) {
+              LOG.debug("Not within the time window for processing: {}",
+                  pool.getPoolName());
+              // we might over sleep here, not a big deal.
+              sleepUninterruptibly(this.containerProcessingLag,
+                  TimeUnit.MILLISECONDS);
+            }
+            LOG.debug("Adding pool {} to container processing queue",
+                pool.getPoolName());
+            InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+                pool, this.nodeManager, this.poolManager, this.executorService);
+            inProgressPool.startReconciliation();
+            inProgressPoolListLock.writeLock().lock();
+            try {
+              inProgressPoolList.add(inProgressPool);
+            } finally {
+              inProgressPoolListLock.writeLock().unlock();
+            }
+            poolProcessCount++;
+          } else {
+            break;
+          }
+        }
+        sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+        inProgressPoolListLock.readLock().lock();
+        try {
+          for (InProgressPool inProgressPool : inProgressPoolList) {
+            inProgressPool.finalizeReconciliation();
+            poolQueue.add(inProgressPool.getPool());
+          }
+        } finally {
+          inProgressPoolListLock.readLock().unlock();
+        }
+        inProgressPoolListLock.writeLock().lock();
+        try {
+          inProgressPoolList.clear();
+        } finally {
+          inProgressPoolListLock.writeLock().unlock();
+        }
+      }
+    };
+
+    // We will have only one thread for pool processing.
+    Thread poolProcessThread = new Thread(processPools);
+    poolProcessThread.setDaemon(true);
+    poolProcessThread.setName("Pool replica thread");
+    poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Pool replica thread encountered an " +
+          "error. Thread: {} Error Count : {}", t.toString(), e,
+          threadFaultCount.incrementAndGet());
+      poolProcessThread.start();
+      // TODO : Add a config to restrict how many times we will restart this
+      // thread in a single session.
+    });
+    poolProcessThread.start();
+  }
+
+  /**
+   * Adds a container report to appropriate inProgress Pool.
+   * @param containerReport  -- Container report for a specific container from
+   * a datanode.
+   */
+  public void handleContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto containerReport) {
+    inProgressPoolListLock.readLock().lock();
+    try {
+      String poolName = poolManager.getNodePool(datanodeDetails);
+      for (InProgressPool ppool : inProgressPoolList) {
+        if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
+          ppool.handleContainerReport(datanodeDetails, containerReport);
+          return;
+        }
+      }
+      // TODO: Decide if we can do anything else with this report.
+      LOG.debug("Discarding the container report for pool {}. " +
+              "That pool is not currently in the pool reconciliation process." +
+              " Container Name: {}", poolName, datanodeDetails);
+    } catch (SCMException e) {
+      LOG.warn("Skipping processing container report from datanode {}, "
+              + "cause: failed to get the corresponding node pool",
+          datanodeDetails.toString(), e);
+    } finally {
+      inProgressPoolListLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get in process pool list, used for testing.
+   * @return List of InProgressPool
+   */
+  @VisibleForTesting
+  public List<InProgressPool> getInProcessPoolList() {
+    return inProgressPoolList;
+  }
+
+  /**
+   * Shutdown the Container Replication Manager.
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    setExit();
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
new file mode 100644
index 0000000..4b54731
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * These are pools that are actively checking for replication status of the
+ * containers.
+ */
+public final class InProgressPool {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(InProgressPool.class);
+
+  private final PeriodicPool pool;
+  private final NodeManager nodeManager;
+  private final NodePoolManager poolManager;
+  private final ExecutorService executorService;
+  private final Map<Long, Integer> containerCountMap;
+  private final Map<UUID, Boolean> processedNodeSet;
+  private final long startTime;
+  private ProgressStatus status;
+  private AtomicInteger nodeCount;
+  private AtomicInteger nodeProcessed;
+  private AtomicInteger containerProcessedCount;
+  private long maxWaitTime;
+  /**
+   * Constructs an pool that is being processed.
+   *  @param maxWaitTime - Maximum wait time in milliseconds.
+   * @param pool - Pool that we are working against
+   * @param nodeManager - Nodemanager
+   * @param poolManager - pool manager
+   * @param executorService - Shared Executor service.
+   */
+  InProgressPool(long maxWaitTime, PeriodicPool pool,
+      NodeManager nodeManager, NodePoolManager poolManager,
+                 ExecutorService executorService) {
+    Preconditions.checkNotNull(pool);
+    Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(executorService);
+    Preconditions.checkArgument(maxWaitTime > 0);
+    this.pool = pool;
+    this.nodeManager = nodeManager;
+    this.poolManager = poolManager;
+    this.executorService = executorService;
+    this.containerCountMap = new ConcurrentHashMap<>();
+    this.processedNodeSet = new ConcurrentHashMap<>();
+    this.maxWaitTime = maxWaitTime;
+    startTime = Time.monotonicNow();
+  }
+
+  /**
+   * Returns periodic pool.
+   *
+   * @return PeriodicPool
+   */
+  public PeriodicPool getPool() {
+    return pool;
+  }
+
+  /**
+   * We are done if we have got reports from all nodes or we have
+   * done waiting for the specified time.
+   *
+   * @return true if we are done, false otherwise.
+   */
+  public boolean isDone() {
+    return (nodeCount.get() == nodeProcessed.get()) ||
+        (this.startTime + this.maxWaitTime) > Time.monotonicNow();
+  }
+
+  /**
+   * Gets the number of containers processed.
+   *
+   * @return int
+   */
+  public int getContainerProcessedCount() {
+    return containerProcessedCount.get();
+  }
+
+  /**
+   * Returns the start time in milliseconds.
+   *
+   * @return - Start Time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Get the number of nodes in this pool.
+   *
+   * @return - node count
+   */
+  public int getNodeCount() {
+    return nodeCount.get();
+  }
+
+  /**
+   * Get the number of nodes that we have already processed container reports
+   * from.
+   *
+   * @return - Processed count.
+   */
+  public int getNodeProcessed() {
+    return nodeProcessed.get();
+  }
+
+  /**
+   * Returns the current status.
+   *
+   * @return Status
+   */
+  public ProgressStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * Starts the reconciliation process for all the nodes in the pool.
+   */
+  public void startReconciliation() {
+    List<DatanodeDetails> datanodeDetailsList =
+        this.poolManager.getNodes(pool.getPoolName());
+    if (datanodeDetailsList.size() == 0) {
+      LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
+          pool.getPoolName());
+      this.status = ProgressStatus.Error;
+      return;
+    }
+
+    nodeProcessed = new AtomicInteger(0);
+    containerProcessedCount = new AtomicInteger(0);
+    nodeCount = new AtomicInteger(0);
+    this.status = ProgressStatus.InProgress;
+    this.getPool().setLastProcessedTime(Time.monotonicNow());
+  }
+
+  /**
+   * Queues a container Report for handling. This is done in a worker thread
+   * since decoding a container report might be compute intensive . We don't
+   * want to block since we have asked for bunch of container reports
+   * from a set of datanodes.
+   *
+   * @param containerReport - ContainerReport
+   */
+  public void handleContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto containerReport) {
+    if (status == ProgressStatus.InProgress) {
+      executorService.submit(processContainerReport(datanodeDetails,
+          containerReport));
+    } else {
+      LOG.debug("Cannot handle container report when the pool is in {} status.",
+          status);
+    }
+  }
+
+  private Runnable processContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto reports) {
+    return () -> {
+      if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(),
+          (k) -> true)) {
+        nodeProcessed.incrementAndGet();
+        LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
+            datanodeDetails.getUuid());
+        for (ContainerInfo info : reports.getReportsList()) {
+          containerProcessedCount.incrementAndGet();
+          LOG.debug("Total Containers processed: {} Container Name: {}",
+              containerProcessedCount.get(), info.getContainerID());
+
+          // Update the container map with count + 1 if the key exists or
+          // update the map with 1. Since this is a concurrentMap the
+          // computation and update is atomic.
+          containerCountMap.merge(info.getContainerID(), 1, Integer::sum);
+        }
+      }
+    };
+  }
+
+  /**
+   * Filter the containers based on specific rules.
+   *
+   * @param predicate -- Predicate to filter by
+   * @return A list of map entries.
+   */
+  public List<Map.Entry<Long, Integer>> filterContainer(
+      Predicate<Map.Entry<Long, Integer>> predicate) {
+    return containerCountMap.entrySet().stream()
+        .filter(predicate).collect(Collectors.toList());
+  }
+
+  /**
+   * Used only for testing, calling this will abort container report
+   * processing. This is very dangerous call and should not be made by any users
+   */
+  @VisibleForTesting
+  public void setDoneProcessing() {
+    nodeProcessed.set(nodeCount.get());
+  }
+
+  /**
+   * Returns the pool name.
+   *
+   * @return Name of the pool.
+   */
+  String getPoolName() {
+    return pool.getPoolName();
+  }
+
+  public void finalizeReconciliation() {
+    status = ProgressStatus.Done;
+    //TODO: Add finalizing logic. This is where actual reconciliation happens.
+  }
+
+  /**
+   * Current status of the computing replication status.
+   */
+  public enum ProgressStatus {
+    InProgress, Done, Error
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
new file mode 100644
index 0000000..ef28aa7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Periodic pool is a pool with a time stamp, this allows us to process pools
+ * based on a cyclic clock.
+ */
+public class PeriodicPool implements Comparable<PeriodicPool> {
+  private final String poolName;
+  private long lastProcessedTime;
+  private AtomicLong totalProcessedCount;
+
+  /**
+   * Constructs a periodic pool.
+   *
+   * @param poolName - Name of the pool
+   */
+  public PeriodicPool(String poolName) {
+    this.poolName = poolName;
+    lastProcessedTime = 0;
+    totalProcessedCount = new AtomicLong(0);
+  }
+
+  /**
+   * Get pool Name.
+   * @return PoolName
+   */
+  public String getPoolName() {
+    return poolName;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(PeriodicPool o) {
+    return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PeriodicPool that = (PeriodicPool) o;
+
+    return poolName.equals(that.poolName);
+  }
+
+  @Override
+  public int hashCode() {
+    return poolName.hashCode();
+  }
+
+  /**
+   * Returns the Total Times we have processed this pool.
+   *
+   * @return processed count.
+   */
+  public long getTotalProcessedCount() {
+    return totalProcessedCount.get();
+  }
+
+  /**
+   * Gets the last time we processed this pool.
+   * @return time in milliseconds
+   */
+  public long getLastProcessedTime() {
+    return this.lastProcessedTime;
+  }
+
+
+  /**
+   * Sets the last processed time.
+   *
+   * @param lastProcessedTime - Long in milliseconds.
+   */
+
+  public void setLastProcessedTime(long lastProcessedTime) {
+    this.lastProcessedTime = lastProcessedTime;
+  }
+
+  /*
+   * Increments the total processed count.
+   */
+  public void incTotalProcessedCount() {
+    this.totalProcessedCount.incrementAndGet();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..7bbe2ef
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+/*
+ This package contains routines that manage replication of a container. This
+ relies on container reports to understand the replication level of a
+ container - UnderReplicated, Replicated, OverReplicated -- and manages the
+ replication level based on that.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 72d7e94..4392633 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -124,6 +124,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
 
   /**
+   * Returns the NodePoolManager associated with the NodeManager.
+   * @return NodePoolManager
+   */
+  NodePoolManager getNodePoolManager();
+
+  /**
    * Wait for the heartbeat is processed by NodeManager.
    * @return true if heartbeat has been processed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
new file mode 100644
index 0000000..46faf9ca
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface that defines SCM NodePoolManager.
+ */
+public interface NodePoolManager extends Closeable {
+
+  /**
+   * Add a node to a node pool.
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   */
+  void addNode(String pool, DatanodeDetails node) throws IOException;
+
+  /**
+   * Remove a node from a node pool.
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   * @throws SCMException
+   */
+  void removeNode(String pool, DatanodeDetails node)
+      throws SCMException;
+
+  /**
+   * Get a list of known node pools.
+   * @return a list of known node pool names or an empty list if not node pool
+   * is defined.
+   */
+  List<String> getNodePools();
+
+  /**
+   * Get all nodes of a node pool given the name of the node pool.
+   * @param pool - name of the node pool.
+   * @return a list of datanode ids or an empty list if the node pool was not
+   *  found.
+   */
+  List<DatanodeDetails> getNodes(String pool);
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   * @param datanodeDetails - datanode ID.
+   * @return node pool name if it has been assigned.
+   * null if the node has not been assigned to any node pool yet.
+   */
+  String getNodePool(DatanodeDetails datanodeDetails) throws SCMException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index adca8ea..fc8b013 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
+import com.google.protobuf.GeneratedMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,6 +159,7 @@ public class SCMNodeManager
   private ObjectName nmInfoBean;
 
   // Node pool manager.
+  private final SCMNodePoolManager nodePoolManager;
   private final StorageContainerManager scmManager;
 
   public static final Event<CommandForDatanode> DATANODE_COMMAND =
@@ -208,6 +210,7 @@ public class SCMNodeManager
 
     registerMXBean();
 
+    this.nodePoolManager = new SCMNodePoolManager(conf);
     this.scmManager = scmManager;
   }
 
@@ -679,6 +682,7 @@ public class SCMNodeManager
   @Override
   public void close() throws IOException {
     unregisterMXBean();
+    nodePoolManager.close();
     executorService.shutdown();
     try {
       if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -756,6 +760,20 @@ public class SCMNodeManager
       LOG.info("Leaving startup chill mode.");
     }
 
+    // TODO: define node pool policy for non-default node pool.
+    // For now, all nodes are added to the "DefaultNodePool" upon registration
+    // if it has not been added to any node pool yet.
+    try {
+      if (nodePoolManager.getNodePool(datanodeDetails) == null) {
+        nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
+            datanodeDetails);
+      }
+    } catch (IOException e) {
+      // TODO: make sure registration failure is handled correctly.
+      return RegisteredCommand.newBuilder()
+          .setErrorCode(ErrorCode.errorNodeNotPermitted)
+          .build();
+    }
     // Updating Node Report, as registration is successful
     updateNodeStat(datanodeDetails.getUuid(), nodeReport);
     LOG.info("Data node with ID: {} Registered.",
@@ -842,6 +860,11 @@ public class SCMNodeManager
   }
 
   @Override
+  public NodePoolManager getNodePoolManager() {
+    return nodePoolManager;
+  }
+
+  @Override
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
     for(NodeState state : NodeState.values()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
new file mode 100644
index 0000000..faf330e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_NODE_IN_POOL;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_LOAD_NODEPOOL;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
+
+/**
+ * SCM node pool manager that manges node pools.
+ */
+public final class SCMNodePoolManager implements NodePoolManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMNodePoolManager.class);
+  private static final List<DatanodeDetails> EMPTY_NODE_LIST =
+      new ArrayList<>();
+  private static final List<String> EMPTY_NODEPOOL_LIST = new ArrayList<>();
+  public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
+
+  // DB that saves the node to node pool mapping.
+  private MetadataStore nodePoolStore;
+
+  // In-memory node pool to nodes mapping
+  private HashMap<String, Set<DatanodeDetails>> nodePools;
+
+  // Read-write lock for nodepool operations
+  private ReadWriteLock lock;
+
+  /**
+   * Construct SCMNodePoolManager class that manages node to node pool mapping.
+   * @param conf - configuration.
+   * @throws IOException
+   */
+  public SCMNodePoolManager(final OzoneConfiguration conf)
+      throws IOException {
+    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    File metaDir = getOzoneMetaDirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
+    File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
+    nodePoolStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(nodePoolDBPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+    nodePools = new HashMap<>();
+    lock = new ReentrantReadWriteLock();
+    init();
+  }
+
+  /**
+   * Initialize the in-memory store based on persist store from level db.
+   * No lock is needed as init() is only invoked by constructor.
+   * @throws SCMException
+   */
+  private void init() throws SCMException {
+    try {
+      nodePoolStore.iterate(null, (key, value) -> {
+        try {
+          DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf(
+              HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key));
+          String poolName = DFSUtil.bytes2String(value);
+
+          Set<DatanodeDetails> nodePool = null;
+          if (nodePools.containsKey(poolName)) {
+            nodePool = nodePools.get(poolName);
+          } else {
+            nodePool = new HashSet<>();
+            nodePools.put(poolName, nodePool);
+          }
+          nodePool.add(nodeId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding node: {} to node pool: {}",
+                nodeId, poolName);
+          }
+        } catch (IOException e) {
+          LOG.warn("Can't add a datanode to node pool, continue next...");
+        }
+        return true;
+      });
+    } catch (IOException e) {
+      LOG.error("Loading node pool error " + e);
+      throw new SCMException("Failed to load node pool",
+          FAILED_TO_LOAD_NODEPOOL);
+    }
+  }
+
+  /**
+   * Add a datanode to a node pool.
+   * @param pool - name of the node pool.
+   * @param node - name of the datanode.
+   */
+  @Override
+  public void addNode(final String pool, final DatanodeDetails node)
+      throws IOException {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    Preconditions.checkNotNull(node, "node is null");
+    lock.writeLock().lock();
+    try {
+      // add to the persistent store
+      nodePoolStore.put(node.getProtoBufMessage().toByteArray(),
+          DFSUtil.string2Bytes(pool));
+
+      // add to the in-memory store
+      Set<DatanodeDetails> nodePool = null;
+      if (nodePools.containsKey(pool)) {
+        nodePool = nodePools.get(pool);
+      } else {
+        nodePool = new HashSet<DatanodeDetails>();
+        nodePools.put(pool, nodePool);
+      }
+      nodePool.add(node);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Remove a datanode from a node pool.
+   * @param pool - name of the node pool.
+   * @param node - datanode id.
+   * @throws SCMException
+   */
+  @Override
+  public void removeNode(final String pool, final DatanodeDetails node)
+      throws SCMException {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    Preconditions.checkNotNull(node, "node is null");
+    lock.writeLock().lock();
+    try {
+      // Remove from the persistent store
+      byte[] kName = node.getProtoBufMessage().toByteArray();
+      byte[] kData = nodePoolStore.get(kName);
+      if (kData == null) {
+        throw new SCMException(String.format("Unable to find node %s from" +
+            " pool %s in DB.", DFSUtil.bytes2String(kName), pool),
+            FAILED_TO_FIND_NODE_IN_POOL);
+      }
+      nodePoolStore.delete(kName);
+
+      // Remove from the in-memory store
+      if (nodePools.containsKey(pool)) {
+        Set<DatanodeDetails> nodePool = nodePools.get(pool);
+        nodePool.remove(node);
+      } else {
+        throw new SCMException(String.format("Unable to find node %s from" +
+            " pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
+            FAILED_TO_FIND_NODE_IN_POOL);
+      }
+    } catch (IOException e) {
+      throw new SCMException("Failed to remove node " + node.toString()
+          + " from node pool " + pool, e,
+          SCMException.ResultCodes.IO_EXCEPTION);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Get all the node pools.
+   * @return all the node pools.
+   */
+  @Override
+  public List<String> getNodePools() {
+    lock.readLock().lock();
+    try {
+      if (!nodePools.isEmpty()) {
+        return nodePools.keySet().stream().collect(Collectors.toList());
+      } else {
+        return EMPTY_NODEPOOL_LIST;
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get all datanodes of a specific node pool.
+   * @param pool - name of the node pool.
+   * @return all datanodes of the specified node pool.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(final String pool) {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    if (nodePools.containsKey(pool)) {
+      return nodePools.get(pool).stream().collect(Collectors.toList());
+    } else {
+      return EMPTY_NODE_LIST;
+    }
+  }
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   * @param datanodeDetails - datanode ID.
+   * @return node pool name if it has been assigned.
+   * null if the node has not been assigned to any node pool yet.
+   * TODO: Put this in a in-memory map if performance is an issue.
+   */
+  @Override
+  public String getNodePool(final DatanodeDetails datanodeDetails)
+      throws SCMException {
+    Preconditions.checkNotNull(datanodeDetails, "node is null");
+    try {
+      byte[] result = nodePoolStore.get(
+          datanodeDetails.getProtoBufMessage().toByteArray());
+      return result == null ? null : DFSUtil.bytes2String(result);
+    } catch (IOException e) {
+      throw new SCMException("Failed to get node pool for node "
+          + datanodeDetails.toString(), e,
+          SCMException.ResultCodes.IO_EXCEPTION);
+    }
+  }
+
+  /**
+   * Close node pool level db store.
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    nodePoolStore.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 80b5d6e..8c59462 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -272,6 +273,11 @@ public class MockNodeManager implements NodeManager {
     return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid()));
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
+
   /**
    * Used for testing.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
new file mode 100644
index 0000000..8f412de
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for SCM node pool manager.
+ */
+public class TestSCMNodePoolManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSCMNodePoolManager.class);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private final File testDir = PathUtils.getTestDir(
+      TestSCMNodePoolManager.class);
+
+  SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf)
+      throws IOException {
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    return new SCMNodePoolManager(conf);
+  }
+
+  /**
+   * Test default node pool.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDefaultNodePool() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    try {
+      final String defaultPool = "DefaultPool";
+      NodePoolManager npMgr = createNodePoolManager(conf);
+
+      final int nodeCount = 4;
+      final List<DatanodeDetails> nodes = TestUtils
+          .getListOfDatanodeDetails(nodeCount);
+      assertEquals(0, npMgr.getNodePools().size());
+      for (DatanodeDetails node: nodes) {
+        npMgr.addNode(defaultPool, node);
+      }
+      List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+      assertEquals(nodeCount, nodesRetrieved.size());
+      assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+
+      DatanodeDetails nodeRemoved = nodes.remove(2);
+      npMgr.removeNode(defaultPool, nodeRemoved);
+      List<DatanodeDetails> nodesAfterRemove = npMgr.getNodes(defaultPool);
+      assertTwoDatanodeListsEqual(nodes, nodesAfterRemove);
+
+      List<DatanodeDetails> nonExistSet = npMgr.getNodes("NonExistSet");
+      assertEquals(0, nonExistSet.size());
+    } finally {
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+
+
+  /**
+   * Test default node pool reload.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDefaultNodePoolReload() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    final String defaultPool = "DefaultPool";
+    final int nodeCount = 4;
+    final List<DatanodeDetails> nodes = TestUtils
+        .getListOfDatanodeDetails(nodeCount);
+
+    try {
+      try {
+        SCMNodePoolManager npMgr = createNodePoolManager(conf);
+        assertEquals(0, npMgr.getNodePools().size());
+        for (DatanodeDetails node : nodes) {
+          npMgr.addNode(defaultPool, node);
+        }
+        List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+        assertEquals(nodeCount, nodesRetrieved.size());
+        assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+        npMgr.close();
+      } finally {
+        LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" +
+            " and close.");
+      }
+
+      // try reload with a new NodePoolManager instance
+      try {
+        SCMNodePoolManager npMgr = createNodePoolManager(conf);
+        List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool);
+        assertEquals(nodeCount, nodesRetrieved.size());
+        assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+      } finally {
+        LOG.info("testDefaultNodePoolReload: Finish reloading node pool.");
+      }
+    } finally {
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+
+  /**
+   * Compare and verify that two datanode lists are equal.
+   * @param list1 - datanode list 1.
+   * @param list2 - datanode list 2.
+   */
+  private void assertTwoDatanodeListsEqual(List<DatanodeDetails> list1,
+      List<DatanodeDetails> list2) {
+    assertEquals(list1.size(), list2.size());
+    Collections.sort(list1);
+    Collections.sort(list2);
+    assertTrue(ListUtils.isEqualList(list1, list2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 1a4dcd7..072d821 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.CommandQueue;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
@@ -200,6 +201,10 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return null;
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
 
   /**
    * Wait for the heartbeat is processed by NodeManager.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java
new file mode 100644
index 0000000..ffcd752
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.testutils;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Pool Manager replication mock.
+ */
+public class ReplicationNodePoolManagerMock implements NodePoolManager {
+
+  private final Map<DatanodeDetails, String> nodeMemberShip;
+
+  /**
+   * A node pool manager for testing.
+   */
+  public ReplicationNodePoolManagerMock() {
+    nodeMemberShip = new HashMap<>();
+  }
+
+  /**
+   * Add a node to a node pool.
+   *
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   */
+  @Override
+  public void addNode(String pool, DatanodeDetails node) {
+    nodeMemberShip.put(node, pool);
+  }
+
+  /**
+   * Remove a node from a node pool.
+   *
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   * @throws SCMException
+   */
+  @Override
+  public void removeNode(String pool, DatanodeDetails node)
+      throws SCMException {
+    nodeMemberShip.remove(node);
+
+  }
+
+  /**
+   * Get a list of known node pools.
+   *
+   * @return a list of known node pool names or an empty list if not node pool
+   * is defined.
+   */
+  @Override
+  public List<String> getNodePools() {
+    Set<String> poolSet = new HashSet<>();
+    for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) {
+      poolSet.add(entry.getValue());
+    }
+    return new ArrayList<>(poolSet);
+
+  }
+
+  /**
+   * Get all nodes of a node pool given the name of the node pool.
+   *
+   * @param pool - name of the node pool.
+   * @return a list of datanode ids or an empty list if the node pool was not
+   * found.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(String pool) {
+    Set<DatanodeDetails> datanodeSet = new HashSet<>();
+    for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) {
+      if (entry.getValue().equals(pool)) {
+        datanodeSet.add(entry.getKey());
+      }
+    }
+    return new ArrayList<>(datanodeSet);
+  }
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   *
+   * @param datanodeDetails DatanodeDetails.
+   * @return node pool name if it has been assigned. null if the node has not
+   * been assigned to any node pool yet.
+   */
+  @Override
+  public String getNodePool(DatanodeDetails datanodeDetails) {
+    return nodeMemberShip.get(datanodeDetails);
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
index b4ed2b1..4d70af8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
@@ -51,9 +51,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
 import static org.apache.hadoop.ozone.OzoneConsts.KB;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This class tests the CLI that transforms container into SQLite DB files.
@@ -174,6 +177,34 @@ public class TestContainerSQLCli {
   }
 
   @Test
+  public void testConvertNodepoolDB() throws Exception {
+    String dbOutPath = GenericTestUtils.getTempPath(
+        UUID.randomUUID() + "/out_sql.db");
+    String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS);
+    String dbPath = dbRootPath + "/" + NODEPOOL_DB;
+    String[] args = {"-p", dbPath, "-o", dbOutPath};
+
+    cli.run(args);
+
+    // verify the sqlite db
+    HashMap<String, String> expectedPool = new HashMap<>();
+    for (DatanodeDetails dnid : nodeManager.getAllNodes()) {
+      expectedPool.put(dnid.getUuidString(), "DefaultNodePool");
+    }
+    Connection conn = connectDB(dbOutPath);
+    String sql = "SELECT * FROM nodePool";
+    ResultSet rs = executeQuery(conn, sql);
+    while(rs.next()) {
+      String datanodeUUID = rs.getString("datanodeUUID");
+      String poolName = rs.getString("poolName");
+      assertTrue(expectedPool.remove(datanodeUUID).equals(poolName));
+    }
+    assertEquals(0, expectedPool.size());
+
+    Files.delete(Paths.get(dbOutPath));
+  }
+
+  @Test
   public void testConvertContainerDB() throws Exception {
     String dbOutPath = GenericTestUtils.getTempPath(
         UUID.randomUUID() + "/out_sql.db");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message