hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ehi...@apache.org
Subject [46/50] [abbrv] hadoop git commit: HDDS-245. Handle ContainerReports in the SCM. Contributed by Elek Marton.
Date Fri, 10 Aug 2018 11:35:40 GMT
HDDS-245. Handle ContainerReports in the SCM. Contributed by Elek Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5dbbfe2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5dbbfe2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5dbbfe2

Branch: refs/heads/HDFS-12090
Commit: f5dbbfe2e97a8c11e3df0f95ae4a493f11fdbc28
Parents: b2517dd
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Thu Aug 9 16:55:13 2018 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Thu Aug 9 16:55:39 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/server/events/EventQueue.java   |   7 +-
 .../scm/container/ContainerReportHandler.java   | 107 +++++-
 .../replication/ReplicationActivityStatus.java  |  86 +++++
 .../ReplicationActivityStatusMXBean.java        |  28 ++
 .../replication/ReplicationRequest.java         |  28 +-
 .../hadoop/hdds/scm/events/SCMEvents.java       |   9 +
 .../hdds/scm/node/states/Node2ContainerMap.java |  10 +-
 .../hdds/scm/node/states/ReportResult.java      |  18 +-
 .../scm/server/StorageContainerManager.java     |  27 +-
 .../container/TestContainerReportHandler.java   | 228 +++++++++++++
 .../scm/node/states/Node2ContainerMapTest.java  | 308 -----------------
 .../scm/node/states/TestNode2ContainerMap.java  | 328 +++++++++++++++++++
 12 files changed, 859 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index f93c54b..b2b0df2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -147,7 +147,12 @@ public class EventQueue implements EventPublisher, AutoCloseable {
 
         for (EventHandler handler : executorAndHandlers.getValue()) {
           queuedCount.incrementAndGet();
-
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Delivering event {} to executor/handler {}: {}",
+                event.getName(),
+                executorAndHandlers.getKey().getName(),
+                payload);
+          }
           executorAndHandlers.getKey()
               .onMessage(handler, payload, this);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 486162e..b26eed2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -18,30 +18,131 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Handles container reports from datanode.
  */
 public class ContainerReportHandler implements
     EventHandler<ContainerReportFromDatanode> {
 
-  private final Mapping containerMapping;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportHandler.class);
+
   private final Node2ContainerMap node2ContainerMap;
 
+  private final Mapping containerMapping;
+
+  private ContainerStateManager containerStateManager;
+
+  private ReplicationActivityStatus replicationStatus;
+
+
   public ContainerReportHandler(Mapping containerMapping,
-                                Node2ContainerMap node2ContainerMap) {
+      Node2ContainerMap node2ContainerMap,
+      ReplicationActivityStatus replicationActivityStatus) {
+    Preconditions.checkNotNull(containerMapping);
+    Preconditions.checkNotNull(node2ContainerMap);
+    Preconditions.checkNotNull(replicationActivityStatus);
     this.containerMapping = containerMapping;
     this.node2ContainerMap = node2ContainerMap;
+    this.containerStateManager = containerMapping.getStateManager();
+    this.replicationStatus = replicationActivityStatus;
   }
 
   @Override
   public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
                         EventPublisher publisher) {
-    // TODO: process container report.
+
+    DatanodeDetails datanodeOrigin =
+        containerReportFromDatanode.getDatanodeDetails();
+
+    ContainerReportsProto containerReport =
+        containerReportFromDatanode.getReport();
+    try {
+
+      //update state in container db and trigger close container events
+      containerMapping.processContainerReports(datanodeOrigin, containerReport);
+
+      Set<ContainerID> containerIds = containerReport.getReportsList().stream()
+          .map(containerProto -> containerProto.getContainerID())
+          .map(ContainerID::new)
+          .collect(Collectors.toSet());
+
+      ReportResult reportResult = node2ContainerMap
+          .processReport(datanodeOrigin.getUuid(), containerIds);
+
+      //we have the report, so we can update the states for the next iteration.
+      node2ContainerMap
+          .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
+
+      for (ContainerID containerID : reportResult.getMissingContainers()) {
+        containerStateManager
+            .removeContainerReplica(containerID, datanodeOrigin);
+        emitReplicationRequestEvent(containerID, publisher);
+      }
+
+      for (ContainerID containerID : reportResult.getNewContainers()) {
+        containerStateManager.addContainerReplica(containerID, datanodeOrigin);
+
+        emitReplicationRequestEvent(containerID, publisher);
+      }
+
+    } catch (IOException e) {
+      //TODO: stop all the replication?
+      LOG.error("Error on processing container report from datanode {}",
+          datanodeOrigin, e);
+    }
+
+  }
+
+  private void emitReplicationRequestEvent(ContainerID containerID,
+      EventPublisher publisher) throws SCMException {
+    ContainerInfo container = containerStateManager.getContainer(containerID);
+
+    if (container == null) {
+      //warning unknown container
+      LOG.warn(
+          "Container is missing from containerStateManager. Can't request "
+              + "replication. {}",
+          containerID);
+    }
+    if (replicationStatus.isReplicationEnabled()) {
+
+      int existingReplicas =
+          containerStateManager.getContainerReplicas(containerID).size();
+
+      int expectedReplicas = container.getReplicationFactor().getNumber();
+
+      if (existingReplicas != expectedReplicas) {
+
+        publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+            new ReplicationRequest(containerID.getId(), existingReplicas,
+                container.getReplicationFactor().getNumber()));
+      }
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
new file mode 100644
index 0000000..4a9888c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import javax.management.ObjectName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.metrics2.util.MBeans;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event listener to track the current state of replication.
+ */
+public class ReplicationActivityStatus
+    implements EventHandler<Boolean>, ReplicationActivityStatusMXBean,
+    Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationActivityStatus.class);
+
+  private AtomicBoolean replicationEnabled = new AtomicBoolean();
+
+  private ObjectName jmxObjectName;
+
+  public boolean isReplicationEnabled() {
+    return replicationEnabled.get();
+  }
+
+  @VisibleForTesting
+  public void setReplicationEnabled(boolean enabled) {
+    replicationEnabled.set(enabled);
+  }
+
+  @VisibleForTesting
+  public void enableReplication() {
+    replicationEnabled.set(true);
+  }
+
+  /**
+   * The replication status could be set by async events.
+   */
+  @Override
+  public void onMessage(Boolean enabled, EventPublisher publisher) {
+    replicationEnabled.set(enabled);
+  }
+
+  public void start() {
+    try {
+      this.jmxObjectName =
+          MBeans.register(
+              "StorageContainerManager", "ReplicationActivityStatus", this);
+    } catch (Exception ex) {
+      LOG.error("JMX bean for ReplicationActivityStatus can't be registered",
+          ex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.jmxObjectName != null) {
+      MBeans.unregister(jmxObjectName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
new file mode 100644
index 0000000..164bd24
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+/**
+ * JMX interface to monitor replication status.
+ */
+public interface ReplicationActivityStatusMXBean {
+
+  boolean isReplicationEnabled();
+
+  void setReplicationEnabled(boolean enabled);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
index ef7c546..d40cd9c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
@@ -29,18 +29,24 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 public class ReplicationRequest implements Comparable<ReplicationRequest>,
     Serializable {
   private final long containerId;
-  private final short replicationCount;
-  private final short expecReplicationCount;
+  private final int replicationCount;
+  private final int expecReplicationCount;
   private final long timestamp;
 
-  public ReplicationRequest(long containerId, short replicationCount,
-      long timestamp, short expecReplicationCount) {
+  public ReplicationRequest(long containerId, int replicationCount,
+      long timestamp, int expecReplicationCount) {
     this.containerId = containerId;
     this.replicationCount = replicationCount;
     this.timestamp = timestamp;
     this.expecReplicationCount = expecReplicationCount;
   }
 
+  public ReplicationRequest(long containerId, int replicationCount,
+      int expecReplicationCount) {
+    this(containerId, replicationCount, System.currentTimeMillis(),
+        expecReplicationCount);
+  }
+
   /**
    * Compares this object with the specified object for order.  Returns a
    * negative integer, zero, or a positive integer as this object is less
@@ -93,7 +99,7 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
     return containerId;
   }
 
-  public short getReplicationCount() {
+  public int getReplicationCount() {
     return replicationCount;
   }
 
@@ -101,7 +107,17 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
     return timestamp;
   }
 
-  public short getExpecReplicationCount() {
+  public int getExpecReplicationCount() {
     return expecReplicationCount;
   }
+
+  @Override
+  public String toString() {
+    return "ReplicationRequest{" +
+        "containerId=" + containerId +
+        ", replicationCount=" + replicationCount +
+        ", expecReplicationCount=" + expecReplicationCount +
+        ", timestamp=" + timestamp +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index d49dd4f..70b1e96 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -174,6 +174,15 @@ public final class SCMEvents {
       new TypedEvent<>(ReplicationCompleted.class);
 
   /**
+   * Signal for all the components (but especially for the replication
+   * manager and container report handler) that the replication could be
+   * started. Should be send only if (almost) all the container state are
+   * available from the datanodes.
+   */
+  public static final TypedEvent<Boolean> START_REPLICATION =
+      new TypedEvent<>(Boolean.class);
+
+  /**
    * Private Ctor. Never Constructed.
    */
   private SCMEvents() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
index 1960604..8ed6d59 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -68,7 +69,8 @@ public class Node2ContainerMap {
       throws SCMException {
     Preconditions.checkNotNull(containerIDs);
     Preconditions.checkNotNull(datanodeID);
-    if(dn2ContainerMap.putIfAbsent(datanodeID, containerIDs) != null) {
+    if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+        != null) {
       throw new SCMException("Node already exists in the map",
                   DUPLICATE_DATANODE);
     }
@@ -82,11 +84,13 @@ public class Node2ContainerMap {
    * @throws SCMException - if we don't know about this datanode, for new DN
    *                      use insertNewDatanode.
    */
-  public void updateDatanodeMap(UUID datanodeID, Set<ContainerID> containers)
+  public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers)
       throws SCMException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(containers);
-    if(dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> v) == null){
+    if (dn2ContainerMap
+        .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
+        == null) {
       throw new SCMException("No such datanode", NO_SUCH_DATANODE);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
index cb06cb3..2697629 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 
+import java.util.Collections;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
+
 /**
- * A Container Report gets processsed by the Node2Container and returns the
+ * A Container Report gets processsed by the Node2Container and returns
  * Report Result class.
  */
 public class ReportResult {
@@ -36,6 +39,8 @@ public class ReportResult {
       Set<ContainerID> missingContainers,
       Set<ContainerID> newContainers) {
     this.status = status;
+    Preconditions.checkNotNull(missingContainers);
+    Preconditions.checkNotNull(newContainers);
     this.missingContainers = missingContainers;
     this.newContainers = newContainers;
   }
@@ -80,7 +85,16 @@ public class ReportResult {
     }
 
     ReportResult build() {
-      return new ReportResult(status, missingContainers, newContainers);
+
+      Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
+      Set<ContainerID> nullSafeNewContainers = this.newContainers;
+      if (nullSafeNewContainers == null) {
+        nullSafeNewContainers = Collections.emptySet();
+      }
+      if (nullSafeMissingContainers == null) {
+        nullSafeMissingContainers = Collections.emptySet();
+      }
+      return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9cb1318..47a9100 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -164,9 +166,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * Key = DatanodeUuid, value = ContainerStat.
    */
   private Cache<String, ContainerStat> containerReportCache;
+
   private final ReplicationManager replicationManager;
+
   private final LeaseManager<Long> commandWatcherLeaseManager;
 
+  private final ReplicationActivityStatus replicationStatus;
+
   /**
    * Creates a new StorageContainerManager. Configuration will be updated
    * with information on the
@@ -199,19 +205,26 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
 
+    replicationStatus = new ReplicationActivityStatus();
+
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(scmContainerManager);
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
-    ContainerReportHandler containerReportHandler =
-        new ContainerReportHandler(scmContainerManager, node2ContainerMap);
+
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
+
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
     StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
 
+    ContainerReportHandler containerReportHandler =
+        new ContainerReportHandler(scmContainerManager, node2ContainerMap,
+            replicationStatus);
+
+
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
@@ -221,6 +234,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
+    eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -580,6 +594,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         "server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
     getDatanodeProtocolServer().start();
 
+    replicationStatus.start();
     httpServer.start();
     scmBlockManager.start();
     replicationManager.start();
@@ -592,6 +607,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   public void stop() {
 
     try {
+      LOG.info("Stopping Replication Activity Status tracker.");
+      replicationStatus.close();
+    } catch (Exception ex) {
+      LOG.error("Replication Activity Status tracker stop failed.", ex);
+    }
+
+
+    try {
       LOG.info("Stopping Replication Manager Service.");
       replicationManager.stop();
     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
new file mode 100644
index 0000000..363db99
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+    .Builder;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyLong;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the behaviour of the ContainerReportHandler.
+ */
+public class TestContainerReportHandler implements EventPublisher {
+
+  private List<Object> publishedEvents = new ArrayList<>();
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerReportHandler.class);
+
+  @Before
+  public void resetEventCollector() {
+    publishedEvents.clear();
+  }
+
+  @Test
+  public void test() throws IOException {
+
+    //given
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
+    Mapping mapping = Mockito.mock(Mapping.class);
+
+    when(mapping.getContainer(anyLong()))
+        .thenAnswer(
+            (Answer<ContainerInfo>) invocation ->
+                new Builder()
+                    .setReplicationFactor(ReplicationFactor.THREE)
+                    .setContainerID((Long) invocation.getArguments()[0])
+                    .build()
+        );
+
+    ContainerStateManager containerStateManager =
+        new ContainerStateManager(conf, mapping);
+
+    when(mapping.getStateManager()).thenReturn(containerStateManager);
+
+    ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+
+    ContainerReportHandler reportHandler =
+        new ContainerReportHandler(mapping, node2ContainerMap,
+            replicationActivityStatus);
+
+    DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
+    node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>());
+    PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+
+    Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
+        ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1");
+
+    when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
+        ReplicationFactor.THREE)).thenReturn(pipeline);
+
+    long c1 = containerStateManager
+        .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+            ReplicationFactor.THREE, "root").getContainerInfo()
+        .getContainerID();
+
+    long c2 = containerStateManager
+        .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+            ReplicationFactor.THREE, "root").getContainerInfo()
+        .getContainerID();
+
+    //when
+
+    //initial reports before replication is enabled. 2 containers w 3 replicas.
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn1,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn3,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn4,
+            createContainerReport(new long[] {})), this);
+
+    Assert.assertEquals(0, publishedEvents.size());
+
+    replicationActivityStatus.enableReplication();
+
+    //no problem here
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn1,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    Assert.assertEquals(0, publishedEvents.size());
+
+    //container is missing from d2
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1})), this);
+
+    Assert.assertEquals(1, publishedEvents.size());
+    ReplicationRequest replicationRequest =
+        (ReplicationRequest) publishedEvents.get(0);
+
+    Assert.assertEquals(c2, replicationRequest.getContainerId());
+    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+    Assert.assertEquals(2, replicationRequest.getReplicationCount());
+
+    //container was replicated to dn4
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn4,
+            createContainerReport(new long[] {c2})), this);
+
+    //no more event, everything is perfect
+    Assert.assertEquals(1, publishedEvents.size());
+
+    //c2 was found at dn2 (it was missing before, magic)
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    //c2 is over replicated (dn1,dn2,dn3,dn4)
+    Assert.assertEquals(2, publishedEvents.size());
+
+    replicationRequest =
+        (ReplicationRequest) publishedEvents.get(1);
+
+    Assert.assertEquals(c2, replicationRequest.getContainerId());
+    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+    Assert.assertEquals(4, replicationRequest.getReplicationCount());
+
+  }
+
+  private ContainerReportsProto createContainerReport(long[] containerIds) {
+
+    ContainerReportsProto.Builder crBuilder =
+        ContainerReportsProto.newBuilder();
+
+    for (long containerId : containerIds) {
+      org.apache.hadoop.hdds.protocol.proto
+          .StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder
+          ciBuilder = org.apache.hadoop.hdds.protocol.proto
+          .StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
+      ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+          .setSize(5368709120L)
+          .setUsed(2000000000L)
+          .setKeyCount(100000000L)
+          .setReadCount(100000000L)
+          .setWriteCount(100000000L)
+          .setReadBytes(2000000000L)
+          .setWriteBytes(2000000000L)
+          .setContainerID(containerId)
+          .setDeleteTransactionId(0);
+
+      crBuilder.addReports(ciBuilder.build());
+    }
+
+    return crBuilder.build();
+  }
+
+  @Override
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
+      EVENT_TYPE event, PAYLOAD payload) {
+    LOG.info("Event is published: {}", payload);
+    publishedEvents.add(payload);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
deleted file mode 100644
index 79f1b40..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Test classes for Node2ContainerMap.
- */
-public class Node2ContainerMapTest {
-  private final static int DATANODE_COUNT = 300;
-  private final static int CONTAINER_COUNT = 1000;
-  private final Map<UUID, TreeSet<ContainerID>> testData = new
-      ConcurrentHashMap<>();
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  private void generateData() {
-    for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
-      TreeSet<ContainerID> currentSet = new TreeSet<>();
-      for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) {
-        long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex;
-        currentSet.add(new ContainerID(currentCnIndex));
-      }
-      testData.put(UUID.randomUUID(), currentSet);
-    }
-  }
-
-  private UUID getFirstKey() {
-    return testData.keySet().iterator().next();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    generateData();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public void testIsKnownDatanode() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    UUID knownNode = getFirstKey();
-    UUID unknownNode = UUID.randomUUID();
-    Set<ContainerID> containerIDs = testData.get(knownNode);
-    map.insertNewDatanode(knownNode, containerIDs);
-    Assert.assertTrue("Not able to detect a known node",
-        map.isKnownDatanode(knownNode));
-    Assert.assertFalse("Unknown node detected",
-        map.isKnownDatanode(unknownNode));
-  }
-
-  @Test
-  public void testInsertNewDatanode() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    UUID knownNode = getFirstKey();
-    Set<ContainerID> containerIDs = testData.get(knownNode);
-    map.insertNewDatanode(knownNode, containerIDs);
-    Set<ContainerID> readSet = map.getContainers(knownNode);
-
-    // Assert that all elements are present in the set that we read back from
-    // node map.
-    Set newSet = new TreeSet((readSet));
-    Assert.assertTrue(newSet.removeAll(containerIDs));
-    Assert.assertTrue(newSet.size() == 0);
-
-    thrown.expect(SCMException.class);
-    thrown.expectMessage("already exists");
-    map.insertNewDatanode(knownNode, containerIDs);
-
-    map.removeDatanode(knownNode);
-    map.insertNewDatanode(knownNode, containerIDs);
-
-  }
-
-  @Test
-  public void testProcessReportCheckOneNode() throws SCMException {
-    UUID key = getFirstKey();
-    Set<ContainerID> values = testData.get(key);
-    Node2ContainerMap map = new Node2ContainerMap();
-    map.insertNewDatanode(key, values);
-    Assert.assertTrue(map.isKnownDatanode(key));
-    ReportResult result = map.processReport(key, values);
-    Assert.assertEquals(result.getStatus(),
-        Node2ContainerMap.ReportStatus.ALL_IS_WELL);
-  }
-
-  @Test
-  public void testProcessReportInsertAll() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-
-    for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) {
-      map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
-    }
-    // Assert all Keys are known datanodes.
-    for (UUID key : testData.keySet()) {
-      Assert.assertTrue(map.isKnownDatanode(key));
-    }
-  }
-
-  /*
-  For ProcessReport we have to test the following scenarios.
-
-  1. New Datanode - A new datanode appears and we have to add that to the
-  SCM's Node2Container Map.
-
-  2.  New Container - A Datanode exists, but a new container is added to that
-   DN. We need to detect that and return a list of added containers.
-
-  3. Missing Container - A Datanode exists, but one of the expected container
-   on that datanode is missing. We need to detect that.
-
-   4. We get a container report that has both the missing and new containers.
-    We need to return separate lists for these.
-   */
-
-  /**
-   * Assert that we are able to detect the addition of a new datanode.
-   *
-   * @throws SCMException
-   */
-  @Test
-  public void testProcessReportDetectNewDataNode() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    // If we attempt to process a node that is not present in the map,
-    // we get a result back that says, NEW_NODE_FOUND.
-    UUID key = getFirstKey();
-    TreeSet<ContainerID> values = testData.get(key);
-    ReportResult result = map.processReport(key, values);
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
-        result.getStatus());
-    Assert.assertEquals(result.getNewContainers().size(), values.size());
-  }
-
-  /**
-   * This test asserts that processReport is able to detect new containers
-   * when it is added to a datanode. For that we populate the DN with a list
-   * of containerIDs and then add few more containers and make sure that we
-   * are able to detect them.
-   *
-   * @throws SCMException
-   */
-  @Test
-  public void testProcessReportDetectNewContainers() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    UUID key = getFirstKey();
-    TreeSet<ContainerID> values = testData.get(key);
-    map.insertNewDatanode(key, values);
-
-    final int newCount = 100;
-    // This is not a mistake, the treeset seems to be reverse sorted.
-    ContainerID last = values.pollFirst();
-    TreeSet<ContainerID> addedContainers = new TreeSet<>();
-    for (int x = 1; x <= newCount; x++) {
-      long cTemp = last.getId() + x;
-      addedContainers.add(new ContainerID(cTemp));
-    }
-
-    // This set is the super set of existing containers and new containers.
-    TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
-    newContainersSet.addAll(addedContainers);
-
-    ReportResult result = map.processReport(key, newContainersSet);
-
-    //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
-        result.getStatus());
-
-    Assert.assertEquals(addedContainers.size(),
-        result.getNewContainers().size());
-
-    // Assert that the Container IDs are the same as we added new.
-    Assert.assertTrue("All objects are not removed.",
-        result.getNewContainers().removeAll(addedContainers));
-  }
-
-  /**
-   * This test asserts that processReport is able to detect missing containers
-   * if they are misssing from a list.
-   *
-   * @throws SCMException
-   */
-  @Test
-  public void testProcessReportDetectMissingContainers() throws SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    UUID key = getFirstKey();
-    TreeSet<ContainerID> values = testData.get(key);
-    map.insertNewDatanode(key, values);
-
-    final int removeCount = 100;
-    Random r = new Random();
-
-    ContainerID first = values.pollLast();
-    TreeSet<ContainerID> removedContainers = new TreeSet<>();
-
-    // Pick a random container to remove it is ok to collide no issues.
-    for (int x = 0; x < removeCount; x++) {
-      int startBase = (int) first.getId();
-      long cTemp = r.nextInt(values.size());
-      removedContainers.add(new ContainerID(cTemp + startBase));
-    }
-
-    // This set is a new set with some containers removed.
-    TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
-    newContainersSet.removeAll(removedContainers);
-
-    ReportResult result = map.processReport(key, newContainersSet);
-
-
-    //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
-        result.getStatus());
-    Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
-
-    // Assert that the Container IDs are the same as we added new.
-    Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
-  }
-
-  @Test
-  public void testProcessReportDetectNewAndMissingContainers() throws
-      SCMException {
-    Node2ContainerMap map = new Node2ContainerMap();
-    UUID key = getFirstKey();
-    TreeSet<ContainerID> values = testData.get(key);
-    map.insertNewDatanode(key, values);
-
-    Set<ContainerID> insertedSet = new TreeSet<>();
-    // Insert nodes from 1..30
-    for (int x = 1; x <= 30; x++) {
-      insertedSet.add(new ContainerID(x));
-    }
-
-
-    final int removeCount = 100;
-    Random r = new Random();
-
-    ContainerID first = values.pollLast();
-    TreeSet<ContainerID> removedContainers = new TreeSet<>();
-
-    // Pick a random container to remove it is ok to collide no issues.
-    for (int x = 0; x < removeCount; x++) {
-      int startBase = (int) first.getId();
-      long cTemp = r.nextInt(values.size());
-      removedContainers.add(new ContainerID(cTemp + startBase));
-    }
-
-    Set<ContainerID> newSet = new TreeSet<>(values);
-    newSet.addAll(insertedSet);
-    newSet.removeAll(removedContainers);
-
-    ReportResult result = map.processReport(key, newSet);
-
-
-    Assert.assertEquals(
-        Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
-        result.getStatus());
-    Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
-
-
-    // Assert that the Container IDs are the same as we added new.
-    Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
-
-    Assert.assertEquals(insertedSet.size(),
-        result.getNewContainers().size());
-
-    // Assert that the Container IDs are the same as we added new.
-    Assert.assertTrue("All inserted containers are not found.",
-        result.getNewContainers().removeAll(insertedSet));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
new file mode 100644
index 0000000..633653b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test classes for Node2ContainerMap.
+ */
+public class TestNode2ContainerMap {
+  private final static int DATANODE_COUNT = 300;
+  private final static int CONTAINER_COUNT = 1000;
+  private final Map<UUID, TreeSet<ContainerID>> testData = new
+      ConcurrentHashMap<>();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private void generateData() {
+    for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
+      TreeSet<ContainerID> currentSet = new TreeSet<>();
+      for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) {
+        long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex;
+        currentSet.add(new ContainerID(currentCnIndex));
+      }
+      testData.put(UUID.randomUUID(), currentSet);
+    }
+  }
+
+  private UUID getFirstKey() {
+    return testData.keySet().iterator().next();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    generateData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testIsKnownDatanode() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    UUID knownNode = getFirstKey();
+    UUID unknownNode = UUID.randomUUID();
+    Set<ContainerID> containerIDs = testData.get(knownNode);
+    map.insertNewDatanode(knownNode, containerIDs);
+    Assert.assertTrue("Not able to detect a known node",
+        map.isKnownDatanode(knownNode));
+    Assert.assertFalse("Unknown node detected",
+        map.isKnownDatanode(unknownNode));
+  }
+
+  @Test
+  public void testInsertNewDatanode() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    UUID knownNode = getFirstKey();
+    Set<ContainerID> containerIDs = testData.get(knownNode);
+    map.insertNewDatanode(knownNode, containerIDs);
+    Set<ContainerID> readSet = map.getContainers(knownNode);
+
+    // Assert that all elements are present in the set that we read back from
+    // node map.
+    Set newSet = new TreeSet((readSet));
+    Assert.assertTrue(newSet.removeAll(containerIDs));
+    Assert.assertTrue(newSet.size() == 0);
+
+    thrown.expect(SCMException.class);
+    thrown.expectMessage("already exists");
+    map.insertNewDatanode(knownNode, containerIDs);
+
+    map.removeDatanode(knownNode);
+    map.insertNewDatanode(knownNode, containerIDs);
+
+  }
+
+  @Test
+  public void testProcessReportCheckOneNode() throws SCMException {
+    UUID key = getFirstKey();
+    Set<ContainerID> values = testData.get(key);
+    Node2ContainerMap map = new Node2ContainerMap();
+    map.insertNewDatanode(key, values);
+    Assert.assertTrue(map.isKnownDatanode(key));
+    ReportResult result = map.processReport(key, values);
+    Assert.assertEquals(result.getStatus(),
+        Node2ContainerMap.ReportStatus.ALL_IS_WELL);
+  }
+
+  @Test
+  public void testUpdateDatanodeMap() throws SCMException {
+    UUID datanodeId = getFirstKey();
+    Set<ContainerID> values = testData.get(datanodeId);
+    Node2ContainerMap map = new Node2ContainerMap();
+    map.insertNewDatanode(datanodeId, values);
+    Assert.assertTrue(map.isKnownDatanode(datanodeId));
+    Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+    //remove one container
+    values.remove(values.iterator().next());
+    Assert.assertEquals(CONTAINER_COUNT - 1, values.size());
+    Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+    map.setContainersForDatanode(datanodeId, values);
+
+    Assert.assertEquals(values.size(), map.getContainers(datanodeId).size());
+    Assert.assertEquals(values, map.getContainers(datanodeId));
+  }
+
+  @Test
+  public void testProcessReportInsertAll() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+
+    for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) {
+      map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
+    }
+    // Assert all Keys are known datanodes.
+    for (UUID key : testData.keySet()) {
+      Assert.assertTrue(map.isKnownDatanode(key));
+    }
+  }
+
+  /*
+  For ProcessReport we have to test the following scenarios.
+
+  1. New Datanode - A new datanode appears and we have to add that to the
+  SCM's Node2Container Map.
+
+  2.  New Container - A Datanode exists, but a new container is added to that
+   DN. We need to detect that and return a list of added containers.
+
+  3. Missing Container - A Datanode exists, but one of the expected container
+   on that datanode is missing. We need to detect that.
+
+   4. We get a container report that has both the missing and new containers.
+    We need to return separate lists for these.
+   */
+
+  /**
+   * Assert that we are able to detect the addition of a new datanode.
+   *
+   * @throws SCMException
+   */
+  @Test
+  public void testProcessReportDetectNewDataNode() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    // If we attempt to process a node that is not present in the map,
+    // we get a result back that says, NEW_NODE_FOUND.
+    UUID key = getFirstKey();
+    TreeSet<ContainerID> values = testData.get(key);
+    ReportResult result = map.processReport(key, values);
+    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
+        result.getStatus());
+    Assert.assertEquals(result.getNewContainers().size(), values.size());
+  }
+
+  /**
+   * This test asserts that processReport is able to detect new containers
+   * when it is added to a datanode. For that we populate the DN with a list
+   * of containerIDs and then add few more containers and make sure that we
+   * are able to detect them.
+   *
+   * @throws SCMException
+   */
+  @Test
+  public void testProcessReportDetectNewContainers() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    UUID key = getFirstKey();
+    TreeSet<ContainerID> values = testData.get(key);
+    map.insertNewDatanode(key, values);
+
+    final int newCount = 100;
+    // This is not a mistake, the treeset seems to be reverse sorted.
+    ContainerID last = values.first();
+    TreeSet<ContainerID> addedContainers = new TreeSet<>();
+    for (int x = 1; x <= newCount; x++) {
+      long cTemp = last.getId() + x;
+      addedContainers.add(new ContainerID(cTemp));
+    }
+
+    // This set is the super set of existing containers and new containers.
+    TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
+    newContainersSet.addAll(addedContainers);
+
+    ReportResult result = map.processReport(key, newContainersSet);
+
+    //Assert that expected size of missing container is same as addedContainers
+    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
+        result.getStatus());
+
+    Assert.assertEquals(addedContainers.size(),
+        result.getNewContainers().size());
+
+    // Assert that the Container IDs are the same as we added new.
+    Assert.assertTrue("All objects are not removed.",
+        result.getNewContainers().removeAll(addedContainers));
+  }
+
+  /**
+   * This test asserts that processReport is able to detect missing containers
+   * if they are misssing from a list.
+   *
+   * @throws SCMException
+   */
+  @Test
+  public void testProcessReportDetectMissingContainers() throws SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    UUID key = getFirstKey();
+    TreeSet<ContainerID> values = testData.get(key);
+    map.insertNewDatanode(key, values);
+
+    final int removeCount = 100;
+    Random r = new Random();
+
+    ContainerID first = values.last();
+    TreeSet<ContainerID> removedContainers = new TreeSet<>();
+
+    // Pick a random container to remove it is ok to collide no issues.
+    for (int x = 0; x < removeCount; x++) {
+      int startBase = (int) first.getId();
+      long cTemp = r.nextInt(values.size());
+      removedContainers.add(new ContainerID(cTemp + startBase));
+    }
+
+    // This set is a new set with some containers removed.
+    TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
+    newContainersSet.removeAll(removedContainers);
+
+    ReportResult result = map.processReport(key, newContainersSet);
+
+
+    //Assert that expected size of missing container is same as addedContainers
+    Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
+        result.getStatus());
+    Assert.assertEquals(removedContainers.size(),
+        result.getMissingContainers().size());
+
+    // Assert that the Container IDs are the same as we added new.
+    Assert.assertTrue("All missing containers not found.",
+        result.getMissingContainers().removeAll(removedContainers));
+  }
+
+  @Test
+  public void testProcessReportDetectNewAndMissingContainers() throws
+      SCMException {
+    Node2ContainerMap map = new Node2ContainerMap();
+    UUID key = getFirstKey();
+    TreeSet<ContainerID> values = testData.get(key);
+    map.insertNewDatanode(key, values);
+
+    Set<ContainerID> insertedSet = new TreeSet<>();
+    // Insert nodes from 1..30
+    for (int x = 1; x <= 30; x++) {
+      insertedSet.add(new ContainerID(x));
+    }
+
+
+    final int removeCount = 100;
+    Random r = new Random();
+
+    ContainerID first = values.last();
+    TreeSet<ContainerID> removedContainers = new TreeSet<>();
+
+    // Pick a random container to remove it is ok to collide no issues.
+    for (int x = 0; x < removeCount; x++) {
+      int startBase = (int) first.getId();
+      long cTemp = r.nextInt(values.size());
+      removedContainers.add(new ContainerID(cTemp + startBase));
+    }
+
+    Set<ContainerID> newSet = new TreeSet<>(values);
+    newSet.addAll(insertedSet);
+    newSet.removeAll(removedContainers);
+
+    ReportResult result = map.processReport(key, newSet);
+
+
+    Assert.assertEquals(
+        Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
+        result.getStatus());
+    Assert.assertEquals(removedContainers.size(),
+        result.getMissingContainers().size());
+
+
+    // Assert that the Container IDs are the same as we added new.
+    Assert.assertTrue("All missing containers not found.",
+        result.getMissingContainers().removeAll(removedContainers));
+
+    Assert.assertEquals(insertedSet.size(),
+        result.getNewContainers().size());
+
+    // Assert that the Container IDs are the same as we added new.
+    Assert.assertTrue("All inserted containers are not found.",
+        result.getNewContainers().removeAll(insertedSet));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message