hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject [4/4] hadoop git commit: HDDS-662. Introduce ContainerReplicaState in StorageContainerManager. Contributed by Nanda kumar.
Date Wed, 17 Oct 2018 12:15:56 GMT
HDDS-662. Introduce ContainerReplicaState in StorageContainerManager. Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50715c06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50715c06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50715c06

Branch: refs/heads/trunk
Commit: 50715c0699b2603622223c40ef0729c83ac26cf0
Parents: a9a63ae
Author: Nandakumar <nanda@apache.org>
Authored: Wed Oct 17 17:45:35 2018 +0530
Committer: Nandakumar <nanda@apache.org>
Committed: Wed Oct 17 17:45:35 2018 +0530

----------------------------------------------------------------------
 .../scm/client/ContainerOperationClient.java    |   2 +-
 .../hadoop/hdds/scm/client/ScmClient.java       |   2 +-
 .../hdds/scm/container/ContainerException.java  |  46 ++
 .../hadoop/hdds/scm/container/ContainerID.java  |  28 +-
 .../hdds/scm/container/ContainerInfo.java       | 449 +++++++++++++++
 .../container/ContainerNotFoundException.java   |  44 ++
 .../ContainerReplicaNotFoundException.java      |  45 ++
 .../container/common/helpers/ContainerInfo.java | 482 ----------------
 .../common/helpers/ContainerWithPipeline.java   |   1 +
 .../StorageContainerLocationProtocol.java       |   2 +-
 ...rLocationProtocolClientSideTranslatorPB.java |   2 +-
 ...rLocationProtocolServerSideTranslatorPB.java |   2 +-
 hadoop-hdds/common/src/main/proto/hdds.proto    |  17 +-
 .../report/CommandStatusReportPublisher.java    |   2 +-
 .../common/report/TestReportPublisher.java      |  13 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |  14 +-
 .../block/DatanodeDeletedBlockTransactions.java |   6 +-
 .../hdds/scm/block/DeletedBlockLogImpl.java     |   5 +-
 .../container/CloseContainerEventHandler.java   |  31 +-
 .../scm/container/CloseContainerWatcher.java    |   3 +-
 .../hdds/scm/container/ContainerManager.java    |  70 ++-
 .../hdds/scm/container/ContainerReplica.java    | 197 +++++++
 .../scm/container/ContainerReportHandler.java   |  60 +-
 .../scm/container/ContainerStateManager.java    | 242 ++++----
 .../hdds/scm/container/SCMContainerManager.java | 566 ++++++++-----------
 .../replication/ReplicationManager.java         |  38 +-
 .../scm/container/states/ContainerStateMap.java | 267 +++++----
 .../hadoop/hdds/scm/node/DeadNodeHandler.java   |  74 ++-
 .../hdds/scm/server/SCMChillModeManager.java    |   2 +-
 .../scm/server/SCMClientProtocolServer.java     |  32 +-
 .../scm/server/SCMDatanodeProtocolServer.java   |   2 +-
 .../scm/server/StorageContainerManager.java     |  22 +-
 .../apache/hadoop/hdds/scm/HddsTestUtils.java   |   2 +-
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  48 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java |   2 +-
 .../hdds/scm/block/TestDeletedBlockLog.java     |  14 +-
 .../TestCloseContainerEventHandler.java         |  22 +-
 .../container/TestContainerReportHandler.java   |  66 +--
 .../container/TestContainerStateManager.java    |  60 +-
 .../scm/container/TestSCMContainerManager.java  | 117 ++--
 .../replication/TestReplicationManager.java     |  38 +-
 .../hdds/scm/node/TestContainerPlacement.java   |   2 +-
 .../hdds/scm/node/TestDeadNodeHandler.java      | 195 ++++---
 .../scm/server/TestSCMChillModeManager.java     |   2 +-
 .../container/TestCloseContainerWatcher.java    |  12 +-
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java  |   2 +-
 .../hdds/scm/cli/container/ListSubcommand.java  |   4 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   2 +-
 .../TestContainerStateManagerIntegration.java   | 219 ++++---
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |   7 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |  10 +-
 .../org/apache/hadoop/ozone/OzoneTestUtils.java |  10 +-
 .../ozone/TestStorageContainerManager.java      |   4 +-
 .../ozone/client/rest/TestOzoneRestClient.java  |   4 +-
 .../rpc/TestCloseContainerHandlingByClient.java |  10 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    |   6 +-
 .../commandhandler/TestBlockDeletion.java       |   3 +-
 .../TestCloseContainerByPipeline.java           |  10 +-
 .../TestCloseContainerHandler.java              |   4 +-
 .../ozone/om/TestContainerReportWithKeys.java   |   2 +-
 .../hadoop/ozone/om/TestScmChillMode.java       |  14 +-
 .../apache/hadoop/ozone/om/KeyManagerImpl.java  |   4 -
 .../genesis/BenchMarkContainerStateMap.java     |  14 +-
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |   8 +-
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |   3 +-
 65 files changed, 1949 insertions(+), 1739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index c2bfb42..c635df4 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 184c547..c37f42c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.client;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerData;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java
new file mode 100644
index 0000000..9d37dfb
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+/**
+ * Signals that ContainerException of some sort has occurred. This is parent
+ * of all the exceptions thrown by ContainerManager.
+ */
+public class ContainerException extends IOException {
+
+  /**
+   * Constructs an {@code ContainerException} with {@code null}
+   * as its error detail message.
+   */
+  public ContainerException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code ContainerException} with the specified detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public ContainerException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
index 49af297..e7ac350 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
 import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -29,18 +30,17 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
  * We are creating a specific type for this to avoid mixing this with
  * normal integers in code.
  */
-public class ContainerID implements Comparable {
+public final class ContainerID implements Comparable<ContainerID> {
 
   private final long id;
 
+  // TODO: make this private.
   /**
    * Constructs ContainerID.
    *
    * @param id int
    */
   public ContainerID(long id) {
-    Preconditions.checkState(id > 0,
-        "Container ID should be a positive long. "+ id);
     this.id = id;
   }
 
@@ -49,7 +49,9 @@ public class ContainerID implements Comparable {
    * @param containerID  long
    * @return ContainerID.
    */
-  public static ContainerID valueof(long containerID) {
+  public static ContainerID valueof(final long containerID) {
+    Preconditions.checkState(containerID > 0,
+        "Container ID should be a positive long. "+ containerID);
     return new ContainerID(containerID);
   }
 
@@ -62,8 +64,12 @@ public class ContainerID implements Comparable {
     return id;
   }
 
+  public byte[] getBytes() {
+    return Longs.toByteArray(id);
+  }
+
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(final Object o) {
     if (this == o) {
       return true;
     }
@@ -72,7 +78,7 @@ public class ContainerID implements Comparable {
       return false;
     }
 
-    ContainerID that = (ContainerID) o;
+    final ContainerID that = (ContainerID) o;
 
     return new EqualsBuilder()
         .append(getId(), that.getId())
@@ -87,14 +93,8 @@ public class ContainerID implements Comparable {
   }
 
   @Override
-  public int compareTo(Object o) {
-    Preconditions.checkNotNull(o);
-    if(getClass() != o.getClass()) {
-      throw new ClassCastException("ContainerID class expected. found:" +
-          o.getClass().toString());
-    }
-
-    ContainerID that = (ContainerID) o;
+  public int compareTo(final ContainerID that) {
+    Preconditions.checkNotNull(that);
     return new CompareToBuilder()
         .append(this.getId(), that.getId())
         .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
new file mode 100644
index 0000000..a5ea3e3
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import static java.lang.Math.max;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerInfo implements Comparator<ContainerInfo>,
+    Comparable<ContainerInfo>, Externalizable {
+
+  private static final ObjectWriter WRITER;
+  private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
+      + " supported. Use protobuf instead.";
+
+  static {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+    mapper
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);
+    WRITER = mapper.writer();
+  }
+
+  private HddsProtos.LifeCycleState state;
+  @JsonIgnore
+  private PipelineID pipelineID;
+  private ReplicationFactor replicationFactor;
+  private ReplicationType replicationType;
+  private long usedBytes;
+  private long numberOfKeys;
+  private long lastUsed;
+  // The wall-clock ms since the epoch at which the current state enters.
+  private long stateEnterTime;
+  private String owner;
+  private long containerID;
+  private long deleteTransactionId;
+  /**
+   * Allows you to maintain private data on ContainerInfo. This is not
+   * serialized via protobuf, just allows us to maintain some private data.
+   */
+  @JsonIgnore
+  private byte[] data;
+
+  ContainerInfo(
+      long containerID,
+      HddsProtos.LifeCycleState state,
+      PipelineID pipelineID,
+      long usedBytes,
+      long numberOfKeys,
+      long stateEnterTime,
+      String owner,
+      long deleteTransactionId,
+      ReplicationFactor replicationFactor,
+      ReplicationType repType) {
+    this.containerID = containerID;
+    this.pipelineID = pipelineID;
+    this.usedBytes = usedBytes;
+    this.numberOfKeys = numberOfKeys;
+    this.lastUsed = Time.monotonicNow();
+    this.state = state;
+    this.stateEnterTime = stateEnterTime;
+    this.owner = owner;
+    this.deleteTransactionId = deleteTransactionId;
+    this.replicationFactor = replicationFactor;
+    this.replicationType = repType;
+  }
+
+  public ContainerInfo(ContainerInfo info) {
+    this(info.getContainerID(), info.getState(), info.getPipelineID(),
+        info.getUsedBytes(), info.getNumberOfKeys(),
+        info.getStateEnterTime(), info.getOwner(),
+        info.getDeleteTransactionId(), info.getReplicationFactor(),
+        info.getReplicationType());
+  }
+  /**
+   * Needed for serialization findbugs.
+   */
+  public ContainerInfo() {
+  }
+
+  public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    return builder.setPipelineID(
+        PipelineID.getFromProtobuf(info.getPipelineID()))
+        .setUsedBytes(info.getUsedBytes())
+        .setNumberOfKeys(info.getNumberOfKeys())
+        .setState(info.getState())
+        .setStateEnterTime(info.getStateEnterTime())
+        .setOwner(info.getOwner())
+        .setContainerID(info.getContainerID())
+        .setDeleteTransactionId(info.getDeleteTransactionId())
+        .setReplicationFactor(info.getReplicationFactor())
+        .setReplicationType(info.getReplicationType())
+        .build();
+  }
+
+  public long getContainerID() {
+    return containerID;
+  }
+
+  public HddsProtos.LifeCycleState getState() {
+    return state;
+  }
+
+  public void setState(HddsProtos.LifeCycleState state) {
+    this.state = state;
+  }
+
+  public long getStateEnterTime() {
+    return stateEnterTime;
+  }
+
+  public ReplicationFactor getReplicationFactor() {
+    return replicationFactor;
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+
+  public long getUsedBytes() {
+    return usedBytes;
+  }
+
+  public long getNumberOfKeys() {
+    return numberOfKeys;
+  }
+
+  public long getDeleteTransactionId() {
+    return deleteTransactionId;
+  }
+
+  public void updateDeleteTransactionId(long transactionId) {
+    deleteTransactionId = max(transactionId, deleteTransactionId);
+  }
+
+  public ContainerID containerID() {
+    return new ContainerID(getContainerID());
+  }
+
+  /**
+   * Gets the last used time from SCM's perspective.
+   *
+   * @return time in milliseconds.
+   */
+  public long getLastUsed() {
+    return lastUsed;
+  }
+
+  public ReplicationType getReplicationType() {
+    return replicationType;
+  }
+
+  public void updateLastUsedTime() {
+    lastUsed = Time.monotonicNow();
+  }
+
+  public HddsProtos.SCMContainerInfo getProtobuf() {
+    HddsProtos.SCMContainerInfo.Builder builder =
+        HddsProtos.SCMContainerInfo.newBuilder();
+    Preconditions.checkState(containerID > 0);
+    return builder.setContainerID(getContainerID())
+        .setUsedBytes(getUsedBytes())
+        .setNumberOfKeys(getNumberOfKeys()).setState(getState())
+        .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
+        .setDeleteTransactionId(getDeleteTransactionId())
+        .setPipelineID(getPipelineID().getProtobuf())
+        .setReplicationFactor(getReplicationFactor())
+        .setReplicationType(getReplicationType())
+        .setOwner(getOwner())
+        .build();
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public void setOwner(String owner) {
+    this.owner = owner;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerInfo{"
+        + "id=" + containerID
+        + ", state=" + state
+        + ", pipelineID=" + pipelineID
+        + ", stateEnterTime=" + stateEnterTime
+        + ", owner=" + owner
+        + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ContainerInfo that = (ContainerInfo) o;
+
+    return new EqualsBuilder()
+        .append(getContainerID(), that.getContainerID())
+
+        // TODO : Fix this later. If we add these factors some tests fail.
+        // So Commenting this to continue and will enforce this with
+        // Changes in pipeline where we remove Container Name to
+        // SCMContainerinfo from Pipeline.
+        // .append(pipeline.getFactor(), that.pipeline.getFactor())
+        // .append(pipeline.getType(), that.pipeline.getType())
+        .append(owner, that.owner)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(11, 811)
+        .append(getContainerID())
+        .append(getOwner())
+        .toHashCode();
+  }
+
+  /**
+   * Compares its two arguments for order.  Returns a negative integer, zero, or
+   * a positive integer as the first argument is less than, equal to, or greater
+   * than the second.<p>
+   *
+   * @param o1 the first object to be compared.
+   * @param o2 the second object to be compared.
+   * @return a negative integer, zero, or a positive integer as the first
+   * argument is less than, equal to, or greater than the second.
+   * @throws NullPointerException if an argument is null and this comparator
+   *                              does not permit null arguments
+   * @throws ClassCastException   if the arguments' types prevent them from
+   *                              being compared by this comparator.
+   */
+  @Override
+  public int compare(ContainerInfo o1, ContainerInfo o2) {
+    return Long.compare(o1.getLastUsed(), o2.getLastUsed());
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less than,
+   * equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(ContainerInfo o) {
+    return this.compare(this, o);
+  }
+
+  /**
+   * Returns a JSON string of this object.
+   *
+   * @return String - json string
+   * @throws IOException
+   */
+  public String toJsonString() throws IOException {
+    return WRITER.writeValueAsString(this);
+  }
+
+  /**
+   * Returns private data that is set on this containerInfo.
+   *
+   * @return blob, the user can interpret it any way they like.
+   */
+  public byte[] getData() {
+    if (this.data != null) {
+      return Arrays.copyOf(this.data, this.data.length);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Set private data on ContainerInfo object.
+   *
+   * @param data -- private data.
+   */
+  public void setData(byte[] data) {
+    if (data != null) {
+      this.data = Arrays.copyOf(data, data.length);
+    }
+  }
+
+  /**
+   * Throws IOException as default java serialization is not supported. Use
+   * serialization via protobuf instead.
+   *
+   * @param out the stream to write the object to
+   * @throws IOException Includes any I/O exceptions that may occur
+   * @serialData Overriding methods should use this tag to describe
+   * the data layout of this Externalizable object.
+   * List the sequence of element types and, if possible,
+   * relate the element to a public/protected field and/or
+   * method of this Externalizable class.
+   */
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    throw new IOException(SERIALIZATION_ERROR_MSG);
+  }
+
+  /**
+   * Throws IOException as default java serialization is not supported. Use
+   * serialization via protobuf instead.
+   *
+   * @param in the stream to read data from in order to restore the object
+   * @throws IOException            if I/O errors occur
+   * @throws ClassNotFoundException If the class for an object being
+   *                                restored cannot be found.
+   */
+  @Override
+  public void readExternal(ObjectInput in)
+      throws IOException, ClassNotFoundException {
+    throw new IOException(SERIALIZATION_ERROR_MSG);
+  }
+
+  /**
+   * Builder class for ContainerInfo.
+   */
+  public static class Builder {
+    private HddsProtos.LifeCycleState state;
+    private long used;
+    private long keys;
+    private long stateEnterTime;
+    private String owner;
+    private long containerID;
+    private long deleteTransactionId;
+    private PipelineID pipelineID;
+    private ReplicationFactor replicationFactor;
+    private ReplicationType replicationType;
+
+    public Builder setReplicationType(
+        ReplicationType repType) {
+      this.replicationType = repType;
+      return this;
+    }
+
+    public Builder setPipelineID(PipelineID pipelineId) {
+      this.pipelineID = pipelineId;
+      return this;
+    }
+
+    public Builder setReplicationFactor(ReplicationFactor repFactor) {
+      this.replicationFactor = repFactor;
+      return this;
+    }
+
+    public Builder setContainerID(long id) {
+      Preconditions.checkState(id >= 0);
+      this.containerID = id;
+      return this;
+    }
+
+    public Builder setState(HddsProtos.LifeCycleState lifeCycleState) {
+      this.state = lifeCycleState;
+      return this;
+    }
+
+    public Builder setUsedBytes(long bytesUsed) {
+      this.used = bytesUsed;
+      return this;
+    }
+
+    public Builder setNumberOfKeys(long keyCount) {
+      this.keys = keyCount;
+      return this;
+    }
+
+    public Builder setStateEnterTime(long time) {
+      this.stateEnterTime = time;
+      return this;
+    }
+
+    public Builder setOwner(String containerOwner) {
+      this.owner = containerOwner;
+      return this;
+    }
+
+    public Builder setDeleteTransactionId(long deleteTransactionID) {
+      this.deleteTransactionId = deleteTransactionID;
+      return this;
+    }
+
+    public ContainerInfo build() {
+      return new ContainerInfo(containerID, state, pipelineID,
+              used, keys, stateEnterTime, owner, deleteTransactionId,
+          replicationFactor, replicationType);
+    }
+  }
+
+  /**
+   * Check if a container is in open state, this will check if the
+   * container is either open, allocated, creating or creating.
+   * Any containers in these states is managed as an open container by SCM.
+   */
+  public boolean isOpen() {
+    return state == HddsProtos.LifeCycleState.ALLOCATED ||
+        state == HddsProtos.LifeCycleState.CREATING ||
+        state == HddsProtos.LifeCycleState.OPEN ||
+        state == HddsProtos.LifeCycleState.CLOSING;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java
new file mode 100644
index 0000000..3eebcce
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+/**
+ * Signals that a container is missing from ContainerManager.
+ */
+public class ContainerNotFoundException extends ContainerException {
+
+  /**
+   * Constructs an {@code ContainerNotFoundException} with {@code null}
+   * as its error detail message.
+   */
+  public ContainerNotFoundException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code ContainerNotFoundException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public ContainerNotFoundException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java
new file mode 100644
index 0000000..fdbc18b
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+/**
+ * Signals that a ContainerReplica is missing from the Container in
+ * ContainerManager.
+ */
+public class ContainerReplicaNotFoundException extends ContainerException {
+
+  /**
+   * Constructs an {@code ContainerReplicaNotFoundException} with {@code null}
+   * as its error detail message.
+   */
+  public ContainerReplicaNotFoundException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code ContainerReplicaNotFoundException} with the
+   * specified detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public ContainerReplicaNotFoundException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
deleted file mode 100644
index 5abcd14..0000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.common.helpers;
-
-import static java.lang.Math.max;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Preconditions;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-import java.util.Comparator;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.util.Time;
-
-/**
- * Class wraps ozone container info.
- */
-public class ContainerInfo implements Comparator<ContainerInfo>,
-    Comparable<ContainerInfo>, Externalizable {
-
-  private static final ObjectWriter WRITER;
-  private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
-      + " supported. Use protobuf instead.";
-
-  static {
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-    mapper
-        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);
-    WRITER = mapper.writer();
-  }
-
-  private HddsProtos.LifeCycleState state;
-  @JsonIgnore
-  private PipelineID pipelineID;
-  private ReplicationFactor replicationFactor;
-  private ReplicationType replicationType;
-  // Bytes allocated by SCM for clients.
-  private long allocatedBytes;
-  // Actual container usage, updated through heartbeat.
-  private long usedBytes;
-  private long numberOfKeys;
-  private long lastUsed;
-  // The wall-clock ms since the epoch at which the current state enters.
-  private long stateEnterTime;
-  private String owner;
-  private long containerID;
-  private long deleteTransactionId;
-  /**
-   * Allows you to maintain private data on ContainerInfo. This is not
-   * serialized via protobuf, just allows us to maintain some private data.
-   */
-  @JsonIgnore
-  private byte[] data;
-
-  ContainerInfo(
-      long containerID,
-      HddsProtos.LifeCycleState state,
-      PipelineID pipelineID,
-      long allocatedBytes,
-      long usedBytes,
-      long numberOfKeys,
-      long stateEnterTime,
-      String owner,
-      long deleteTransactionId,
-      ReplicationFactor replicationFactor,
-      ReplicationType repType) {
-    this.containerID = containerID;
-    this.pipelineID = pipelineID;
-    this.allocatedBytes = allocatedBytes;
-    this.usedBytes = usedBytes;
-    this.numberOfKeys = numberOfKeys;
-    this.lastUsed = Time.monotonicNow();
-    this.state = state;
-    this.stateEnterTime = stateEnterTime;
-    this.owner = owner;
-    this.deleteTransactionId = deleteTransactionId;
-    this.replicationFactor = replicationFactor;
-    this.replicationType = repType;
-  }
-
-  public ContainerInfo(ContainerInfo info) {
-    this(info.getContainerID(), info.getState(), info.getPipelineID(),
-        info.getAllocatedBytes(), info.getUsedBytes(), info.getNumberOfKeys(),
-        info.getStateEnterTime(), info.getOwner(),
-        info.getDeleteTransactionId(), info.getReplicationFactor(),
-        info.getReplicationType());
-  }
-  /**
-   * Needed for serialization findbugs.
-   */
-  public ContainerInfo() {
-  }
-
-  public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
-    ContainerInfo.Builder builder = new ContainerInfo.Builder();
-    return builder.setPipelineID(
-        PipelineID.getFromProtobuf(info.getPipelineID()))
-        .setAllocatedBytes(info.getAllocatedBytes())
-        .setUsedBytes(info.getUsedBytes())
-        .setNumberOfKeys(info.getNumberOfKeys())
-        .setState(info.getState())
-        .setStateEnterTime(info.getStateEnterTime())
-        .setOwner(info.getOwner())
-        .setContainerID(info.getContainerID())
-        .setDeleteTransactionId(info.getDeleteTransactionId())
-        .setReplicationFactor(info.getReplicationFactor())
-        .setReplicationType(info.getReplicationType())
-        .build();
-  }
-
-  public long getContainerID() {
-    return containerID;
-  }
-
-  public HddsProtos.LifeCycleState getState() {
-    return state;
-  }
-
-  public void setState(HddsProtos.LifeCycleState state) {
-    this.state = state;
-  }
-
-  public long getStateEnterTime() {
-    return stateEnterTime;
-  }
-
-  public ReplicationFactor getReplicationFactor() {
-    return replicationFactor;
-  }
-
-  public PipelineID getPipelineID() {
-    return pipelineID;
-  }
-
-  public long getAllocatedBytes() {
-    return allocatedBytes;
-  }
-
-  /**
-   * Set Allocated bytes.
-   *
-   * @param size - newly allocated bytes -- negative size is case of deletes
-   * can be used.
-   */
-  public void updateAllocatedBytes(long size) {
-    this.allocatedBytes += size;
-  }
-
-  public long getUsedBytes() {
-    return usedBytes;
-  }
-
-  public long getNumberOfKeys() {
-    return numberOfKeys;
-  }
-
-  public long getDeleteTransactionId() {
-    return deleteTransactionId;
-  }
-
-  public void updateDeleteTransactionId(long transactionId) {
-    deleteTransactionId = max(transactionId, deleteTransactionId);
-  }
-
-  public ContainerID containerID() {
-    return new ContainerID(getContainerID());
-  }
-
-  /**
-   * Gets the last used time from SCM's perspective.
-   *
-   * @return time in milliseconds.
-   */
-  public long getLastUsed() {
-    return lastUsed;
-  }
-
-  public ReplicationType getReplicationType() {
-    return replicationType;
-  }
-
-  public void updateLastUsedTime() {
-    lastUsed = Time.monotonicNow();
-  }
-
-  public void allocate(long size) {
-    // should we also have total container size in ContainerInfo
-    // and check before allocating?
-    allocatedBytes += size;
-  }
-
-  public HddsProtos.SCMContainerInfo getProtobuf() {
-    HddsProtos.SCMContainerInfo.Builder builder =
-        HddsProtos.SCMContainerInfo.newBuilder();
-    Preconditions.checkState(containerID > 0);
-    return builder.setAllocatedBytes(getAllocatedBytes())
-        .setContainerID(getContainerID())
-        .setUsedBytes(getUsedBytes())
-        .setNumberOfKeys(getNumberOfKeys()).setState(getState())
-        .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
-        .setDeleteTransactionId(getDeleteTransactionId())
-        .setPipelineID(getPipelineID().getProtobuf())
-        .setReplicationFactor(getReplicationFactor())
-        .setReplicationType(getReplicationType())
-        .setOwner(getOwner())
-        .build();
-  }
-
-  public String getOwner() {
-    return owner;
-  }
-
-  public void setOwner(String owner) {
-    this.owner = owner;
-  }
-
-  @Override
-  public String toString() {
-    return "ContainerInfo{"
-        + "id=" + containerID
-        + ", state=" + state
-        + ", pipelineID=" + pipelineID
-        + ", stateEnterTime=" + stateEnterTime
-        + ", owner=" + owner
-        + '}';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    ContainerInfo that = (ContainerInfo) o;
-
-    return new EqualsBuilder()
-        .append(getContainerID(), that.getContainerID())
-
-        // TODO : Fix this later. If we add these factors some tests fail.
-        // So Commenting this to continue and will enforce this with
-        // Changes in pipeline where we remove Container Name to
-        // SCMContainerinfo from Pipeline.
-        // .append(pipeline.getFactor(), that.pipeline.getFactor())
-        // .append(pipeline.getType(), that.pipeline.getType())
-        .append(owner, that.owner)
-        .isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(11, 811)
-        .append(getContainerID())
-        .append(getOwner())
-        .toHashCode();
-  }
-
-  /**
-   * Compares its two arguments for order.  Returns a negative integer, zero, or
-   * a positive integer as the first argument is less than, equal to, or greater
-   * than the second.<p>
-   *
-   * @param o1 the first object to be compared.
-   * @param o2 the second object to be compared.
-   * @return a negative integer, zero, or a positive integer as the first
-   * argument is less than, equal to, or greater than the second.
-   * @throws NullPointerException if an argument is null and this comparator
-   *                              does not permit null arguments
-   * @throws ClassCastException   if the arguments' types prevent them from
-   *                              being compared by this comparator.
-   */
-  @Override
-  public int compare(ContainerInfo o1, ContainerInfo o2) {
-    return Long.compare(o1.getLastUsed(), o2.getLastUsed());
-  }
-
-  /**
-   * Compares this object with the specified object for order.  Returns a
-   * negative integer, zero, or a positive integer as this object is less than,
-   * equal to, or greater than the specified object.
-   *
-   * @param o the object to be compared.
-   * @return a negative integer, zero, or a positive integer as this object is
-   * less than, equal to, or greater than the specified object.
-   * @throws NullPointerException if the specified object is null
-   * @throws ClassCastException   if the specified object's type prevents it
-   *                              from being compared to this object.
-   */
-  @Override
-  public int compareTo(ContainerInfo o) {
-    return this.compare(this, o);
-  }
-
-  /**
-   * Returns a JSON string of this object.
-   *
-   * @return String - json string
-   * @throws IOException
-   */
-  public String toJsonString() throws IOException {
-    return WRITER.writeValueAsString(this);
-  }
-
-  /**
-   * Returns private data that is set on this containerInfo.
-   *
-   * @return blob, the user can interpret it any way they like.
-   */
-  public byte[] getData() {
-    if (this.data != null) {
-      return Arrays.copyOf(this.data, this.data.length);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Set private data on ContainerInfo object.
-   *
-   * @param data -- private data.
-   */
-  public void setData(byte[] data) {
-    if (data != null) {
-      this.data = Arrays.copyOf(data, data.length);
-    }
-  }
-
-  /**
-   * Throws IOException as default java serialization is not supported. Use
-   * serialization via protobuf instead.
-   *
-   * @param out the stream to write the object to
-   * @throws IOException Includes any I/O exceptions that may occur
-   * @serialData Overriding methods should use this tag to describe
-   * the data layout of this Externalizable object.
-   * List the sequence of element types and, if possible,
-   * relate the element to a public/protected field and/or
-   * method of this Externalizable class.
-   */
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    throw new IOException(SERIALIZATION_ERROR_MSG);
-  }
-
-  /**
-   * Throws IOException as default java serialization is not supported. Use
-   * serialization via protobuf instead.
-   *
-   * @param in the stream to read data from in order to restore the object
-   * @throws IOException            if I/O errors occur
-   * @throws ClassNotFoundException If the class for an object being
-   *                                restored cannot be found.
-   */
-  @Override
-  public void readExternal(ObjectInput in)
-      throws IOException, ClassNotFoundException {
-    throw new IOException(SERIALIZATION_ERROR_MSG);
-  }
-
-  /**
-   * Builder class for ContainerInfo.
-   */
-  public static class Builder {
-    private HddsProtos.LifeCycleState state;
-    private long allocated;
-    private long used;
-    private long keys;
-    private long stateEnterTime;
-    private String owner;
-    private long containerID;
-    private long deleteTransactionId;
-    private PipelineID pipelineID;
-    private ReplicationFactor replicationFactor;
-    private ReplicationType replicationType;
-
-    public Builder setReplicationType(
-        ReplicationType repType) {
-      this.replicationType = repType;
-      return this;
-    }
-
-    public Builder setPipelineID(PipelineID pipelineId) {
-      this.pipelineID = pipelineId;
-      return this;
-    }
-
-    public Builder setReplicationFactor(ReplicationFactor repFactor) {
-      this.replicationFactor = repFactor;
-      return this;
-    }
-
-    public Builder setContainerID(long id) {
-      Preconditions.checkState(id >= 0);
-      this.containerID = id;
-      return this;
-    }
-
-    public Builder setState(HddsProtos.LifeCycleState lifeCycleState) {
-      this.state = lifeCycleState;
-      return this;
-    }
-
-    public Builder setAllocatedBytes(long bytesAllocated) {
-      this.allocated = bytesAllocated;
-      return this;
-    }
-
-    public Builder setUsedBytes(long bytesUsed) {
-      this.used = bytesUsed;
-      return this;
-    }
-
-    public Builder setNumberOfKeys(long keyCount) {
-      this.keys = keyCount;
-      return this;
-    }
-
-    public Builder setStateEnterTime(long time) {
-      this.stateEnterTime = time;
-      return this;
-    }
-
-    public Builder setOwner(String containerOwner) {
-      this.owner = containerOwner;
-      return this;
-    }
-
-    public Builder setDeleteTransactionId(long deleteTransactionID) {
-      this.deleteTransactionId = deleteTransactionID;
-      return this;
-    }
-
-    public ContainerInfo build() {
-      return new ContainerInfo(containerID, state, pipelineID, allocated,
-              used, keys, stateEnterTime, owner, deleteTransactionId,
-          replicationFactor, replicationType);
-    }
-  }
-
-  /**
-   * Check if a container is in open state, this will check if the
-   * container is either open, allocated, creating or creating.
-   * Any containers in these states is managed as an open container by SCM.
-   */
-  public boolean isContainerOpen() {
-    return state == HddsProtos.LifeCycleState.ALLOCATED ||
-        state == HddsProtos.LifeCycleState.CREATING ||
-        state == HddsProtos.LifeCycleState.OPEN ||
-        state == HddsProtos.LifeCycleState.CLOSING;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
index 64f42b3..af74a7d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 
 /**
  * Class wraps ozone container info.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index e38077f..712fb7e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.protocol;
 
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 16819e9..8e723e6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InChillModeResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index d2723f0..e2a4ee0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 6525134..9e813af 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -135,16 +135,13 @@ message SCMContainerInfo {
     required int64 containerID = 1;
     required LifeCycleState state = 2;
     optional PipelineID pipelineID = 3;
-    // This is not total size of container, but space allocated by SCM for
-    // clients to write blocks
-    required uint64 allocatedBytes = 4;
-    required uint64 usedBytes = 5;
-    required uint64 numberOfKeys = 6;
-    optional int64 stateEnterTime = 7;
-    required string owner = 8;
-    optional int64 deleteTransactionId = 9;
-    required ReplicationFactor replicationFactor  = 10;
-    required ReplicationType replicationType  = 11;
+    required uint64 usedBytes = 4;
+    required uint64 numberOfKeys = 5;
+    optional int64 stateEnterTime = 6;
+    required string owner = 7;
+    optional int64 deleteTransactionId = 8;
+    required ReplicationFactor replicationFactor  = 9;
+    required ReplicationType replicationType  = 10;
 }
 
 message ContainerWithPipeline {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
index 4736857..f52387b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -75,10 +75,10 @@ public class CommandStatusReportPublisher extends
 
     iterator.forEachRemaining(key -> {
       CommandStatus cmdStatus = map.get(key);
-      builder.addCmdStatus(cmdStatus.getProtoBufMessage());
       // If status is still pending then don't remove it from map as
       // CommandHandler will change its status when it works on this command.
       if (!cmdStatus.getStatus().equals(Status.PENDING)) {
+        builder.addCmdStatus(cmdStatus.getProtoBufMessage());
         map.remove(key);
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 1e82326..b632e02 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -153,19 +153,10 @@ public class TestReportPublisher {
         .build();
     cmdStatusMap.put(obj1.getCmdId(), obj1);
     cmdStatusMap.put(obj2.getCmdId(), obj2);
-    Assert.assertEquals("Should publish report with 2 status objects", 2,
+    // We are not sending the commands whose status is PENDING.
+    Assert.assertEquals("Should publish report with 2 status objects", 1,
         ((CommandStatusReportPublisher) publisher).getReport()
             .getCmdStatusCount());
-    Assert.assertEquals(
-        "Next report should have 1 status objects as command status o"
-            + "bjects are still in Pending state",
-        1, ((CommandStatusReportPublisher) publisher).getReport()
-            .getCmdStatusCount());
-    Assert.assertTrue(
-        "Next report should have 1 status objects as command status "
-            + "objects are still in Pending state",
-        ((CommandStatusReportPublisher) publisher).getReport()
-            .getCmdStatusList().get(0).getStatus().equals(Status.PENDING));
     executorService.shutdown();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 30740c7..b791aad 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -69,7 +69,6 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
   // Currently only user of the block service is Ozone, CBlock manages blocks
   // by itself and does not rely on the Block service offered by SCM.
 
-  private final NodeManager nodeManager;
   private final ContainerManager containerManager;
 
   private final long containerSize;
@@ -95,7 +94,6 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
       final NodeManager nodeManager, final ContainerManager containerManager,
       EventPublisher eventPublisher)
       throws IOException {
-    this.nodeManager = nodeManager;
     this.containerManager = containerManager;
 
     this.containerSize = (long)conf.getStorageSize(
@@ -226,8 +224,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
     // USER of the containers. So there might be cases where a different
     // USER has few containers in ALLOCATED state, which will result in
     // false positive.
-    if (!containerManager.getStateManager().getContainerStateMap()
-        .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+    if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED)
         .isEmpty()) {
       // Since the above check can result in false positive, we have to do
       // the actual check and find out if there are containers in ALLOCATED
@@ -242,7 +239,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
                 HddsProtos.LifeCycleState.ALLOCATED);
         if (containerWithPipeline != null) {
           containerManager.updateContainerState(
-              containerWithPipeline.getContainerInfo().getContainerID(),
+              containerWithPipeline.getContainerInfo().containerID(),
               HddsProtos.LifeCycleEvent.CREATE);
           return newBlock(containerWithPipeline,
               HddsProtos.LifeCycleState.ALLOCATED);
@@ -268,8 +265,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
     // state, we have to check again as we only hold a read lock.
     // Some other thread might have pre-allocated container in meantime.
     synchronized (this) {
-      if (!containerManager.getStateManager().getContainerStateMap()
-          .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+      if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED)
           .isEmpty()) {
         containerWithPipeline = containerManager
             .getMatchingContainerWithPipeline(size, owner, type, factor,
@@ -285,7 +281,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
 
       if (containerWithPipeline != null) {
         containerManager.updateContainerState(
-            containerWithPipeline.getContainerInfo().getContainerID(),
+            containerWithPipeline.getContainerInfo().containerID(),
             HddsProtos.LifeCycleEvent.CREATE);
         return newBlock(containerWithPipeline,
             HddsProtos.LifeCycleState.ALLOCATED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index c86f9cd..5c112a0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import com.google.common.collect.ArrayListMultimap;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
@@ -60,8 +61,9 @@ public class DatanodeDeletedBlockTransactions {
     Pipeline pipeline = null;
     try {
       ContainerWithPipeline containerWithPipeline =
-          containerManager.getContainerWithPipeline(tx.getContainerID());
-      if (containerWithPipeline.getContainerInfo().isContainerOpen()
+          containerManager.getContainerWithPipeline(
+              ContainerID.valueof(tx.getContainerID()));
+      if (containerWithPipeline.getContainerInfo().isOpen()
           || containerWithPipeline.getPipeline().isEmpty()) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 5d3afd5..51790be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.scm.command
     .CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -258,8 +259,8 @@ public class DeletedBlockLogImpl
 
           dnsWithCommittedTxn.add(dnID);
           Pipeline pipeline =
-              containerManager.getContainerWithPipeline(containerId)
-                  .getPipeline();
+              containerManager.getContainerWithPipeline(
+                  ContainerID.valueof(containerId)).getPipeline();
           Collection<DatanodeDetails> containerDnsDetails =
               pipeline.getDatanodes().values();
           // The delete entry can be safely removed from the log if all the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 8be7803..74edbc2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -61,7 +60,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
     ContainerInfo info;
     try {
       containerWithPipeline =
-          containerManager.getContainerWithPipeline(containerID.getId());
+          containerManager.getContainerWithPipeline(containerID);
       info = containerWithPipeline.getContainerInfo();
       if (info == null) {
         LOG.error("Failed to update the container state. Container with id : {}"
@@ -81,8 +80,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
         // We cannot close a container in ALLOCATED state, moving the
         // container to CREATING state, this should eventually
         // timeout and the container will be moved to DELETING state.
-        LOG.debug("Closing container {} in {} state", containerID, state);
-        containerManager.updateContainerState(containerID.getId(),
+        LOG.debug("Closing container #{} in {} state", containerID, state);
+        containerManager.updateContainerState(containerID,
             HddsProtos.LifeCycleEvent.CREATE);
         break;
       case CREATING:
@@ -91,7 +90,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
         LOG.debug("Closing container {} in {} state", containerID, state);
         break;
       case OPEN:
-        containerManager.updateContainerState(containerID.getId(),
+        containerManager.updateContainerState(containerID,
             HddsProtos.LifeCycleEvent.FINALIZE);
         fireCloseContainerEvents(containerWithPipeline, info, publisher);
         break;
@@ -101,16 +100,15 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
       case CLOSED:
       case DELETING:
       case DELETED:
-        LOG.info(
-            "container with id : {} is in {} state and need not be closed.",
-            containerID.getId(), info.getState());
+        LOG.info("Cannot close container #{}, it is already in {} state.",
+            containerID.getId(), state);
         break;
       default:
-        throw new IOException(
-            "Invalid container state for container " + containerID);
+        throw new IOException("Invalid container state for container #"
+            + containerID);
       }
     } catch (IOException ex) {
-      LOG.error("Failed to update the container state for" + "container : {}"
+      LOG.error("Failed to update the container state for container #{}"
           + containerID, ex);
     }
   }
@@ -125,13 +123,14 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
             info.getReplicationType(), info.getPipelineID());
 
     Pipeline pipeline = containerWithPipeline.getPipeline();
-    pipeline.getMachines().stream().map(
-        datanode -> new CommandForDatanode<>(datanode.getUuid(),
-            closeContainerCommand)).forEach((command) -> {
-              publisher.fireEvent(DATANODE_COMMAND, command);
-            });
+    pipeline.getMachines().stream()
+        .map(node ->
+            new CommandForDatanode<>(node.getUuid(), closeContainerCommand))
+        .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command));
+
     publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
         new CloseContainerRetryableReq(containerID));
+
     LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
         pipeline, containerID);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
index 7b94bd2..4593c1f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
@@ -88,7 +88,8 @@ public class CloseContainerWatcher extends
       publisher) {
     try {
       // Check if container is still open
-      if (containerManager.getContainer(containerID).isContainerOpen()) {
+      if (containerManager.getContainer(
+          ContainerID.valueof(containerID)).isOpen()) {
         publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
             ContainerID.valueof(containerID));
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index e586f3e..5dba8fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -31,13 +30,31 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+// TODO: Write extensive java doc.
+// This is the main interface of ContainerManager.
 /**
  * ContainerManager class contains the mapping from a name to a pipeline
  * mapping. This is used by SCM when allocating new locations and when
  * looking up a key.
  */
 public interface ContainerManager extends Closeable {
+
+  /**
+   * Returns all the containers managed by ContainerManager.
+   *
+   * @return List of ContainerInfo
+   */
+  List<ContainerInfo> getContainers();
+
+  /**
+   * Returns all the containers which are in the specified state.
+   *
+   * @return List of ContainerInfo
+   */
+  List<ContainerInfo> getContainers(HddsProtos.LifeCycleState state);
+
   /**
    * Returns the ContainerInfo from the container ID.
    *
@@ -45,7 +62,8 @@ public interface ContainerManager extends Closeable {
    * @return - ContainerInfo such as creation state and the pipeline.
    * @throws IOException
    */
-  ContainerInfo getContainer(long containerID) throws IOException;
+  ContainerInfo getContainer(ContainerID containerID)
+      throws ContainerNotFoundException;
 
   /**
    * Returns the ContainerInfo from the container ID.
@@ -54,8 +72,8 @@ public interface ContainerManager extends Closeable {
    * @return - ContainerWithPipeline such as creation state and the pipeline.
    * @throws IOException
    */
-  ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException;
+  ContainerWithPipeline getContainerWithPipeline(ContainerID containerID)
+      throws ContainerNotFoundException;
 
   /**
    * Returns containers under certain conditions.
@@ -72,8 +90,7 @@ public interface ContainerManager extends Closeable {
    * @return a list of container.
    * @throws IOException
    */
-  List<ContainerInfo> listContainer(long startContainerID, int count)
-      throws IOException;
+  List<ContainerInfo> listContainer(ContainerID startContainerID, int count);
 
   /**
    * Allocates a new container for a given keyName and replication factor.
@@ -93,7 +110,7 @@ public interface ContainerManager extends Closeable {
    * @param containerID - Container ID
    * @throws IOException
    */
-  void deleteContainer(long containerID) throws IOException;
+  void deleteContainer(ContainerID containerID) throws IOException;
 
   /**
    * Update container state.
@@ -102,23 +119,44 @@ public interface ContainerManager extends Closeable {
    * @return - new container state
    * @throws IOException
    */
-  HddsProtos.LifeCycleState updateContainerState(long containerID,
+  HddsProtos.LifeCycleState updateContainerState(ContainerID containerID,
       HddsProtos.LifeCycleEvent event) throws IOException;
 
   /**
-   * Returns the container State Manager.
-   * @return ContainerStateManager
-   */
-  ContainerStateManager getStateManager();
-
-  /**
    * Process container report from Datanode.
    *
    * @param reports Container report
    */
   void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException;
+      ContainerReportsProto reports) throws IOException;
+
+  /**
+   * Returns the latest list of replicas for given containerId.
+   *
+   * @param containerID Container ID
+   * @return Set of ContainerReplica
+   */
+  Set<ContainerReplica> getContainerReplicas(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  /**
+   * Adds a container Replica for the given Container.
+   *
+   * @param containerID Container ID
+   * @param replica ContainerReplica
+   */
+  void updateContainerReplica(ContainerID containerID, ContainerReplica replica)
+      throws ContainerNotFoundException;
+
+  /**
+   * Remove a container Replica form a given Container.
+   *
+   * @param containerID Container ID
+   * @param replica ContainerReplica
+   * @return True of dataNode is removed successfully else false.
+   */
+  void removeContainerReplica(ContainerID containerID, ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException;
 
   /**
    * Update deleteTransactionId according to deleteTransactionMap.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
new file mode 100644
index 0000000..f2e80f4
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.CompareToBuilder;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * In-memory state of a container replica.
+ */
+public final class ContainerReplica implements Comparable<ContainerReplica> {
+
+  final private ContainerID containerID;
+  final private DatanodeDetails datanodeDetails;
+  final private UUID placeOfBirth;
+
+  private Long sequenceId;
+
+
+  private ContainerReplica(ContainerID containerID, DatanodeDetails datanode,
+      UUID originNodeId) {
+    this.containerID = containerID;
+    this.datanodeDetails = datanode;
+    this.placeOfBirth = originNodeId;
+  }
+
+  private void setSequenceId(Long seqId) {
+    sequenceId = seqId;
+  }
+
+  /**
+   * Returns the DatanodeDetails to which this replica belongs.
+   *
+   * @return DatanodeDetails
+   */
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  /**
+   * Returns the UUID of Datanode where this replica originated.
+   *
+   * @return UUID
+   */
+  public UUID getOriginDatanodeId() {
+    return placeOfBirth;
+  }
+
+  /**
+   * Returns the Sequence Id of this replica.
+   *
+   * @return Sequence Id
+   */
+  public Long getSequenceId() {
+    return sequenceId;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(61, 71)
+        .append(containerID)
+        .append(datanodeDetails)
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final ContainerReplica that = (ContainerReplica) o;
+
+    return new EqualsBuilder()
+        .append(containerID, that.containerID)
+        .append(datanodeDetails, that.datanodeDetails)
+        .isEquals();
+  }
+
+  @Override
+  public int compareTo(ContainerReplica that) {
+    Preconditions.checkNotNull(that);
+    return new CompareToBuilder()
+        .append(this.containerID, that.containerID)
+        .append(this.datanodeDetails, that.datanodeDetails)
+        .build();
+  }
+
+  /**
+   * Returns a new Builder to construct ContainerReplica.
+   *
+   * @return ContainerReplicaBuilder
+   */
+  public static ContainerReplicaBuilder newBuilder() {
+    return new ContainerReplicaBuilder();
+  }
+
+  /**
+   * Used for building ContainerReplica instance.
+   */
+  public static class ContainerReplicaBuilder {
+
+    private ContainerID containerID;
+    private DatanodeDetails datanode;
+    private UUID placeOfBirth;
+    private Long sequenceId;
+
+    /**
+     * Set Container Id.
+     *
+     * @param containerId ContainerID
+     * @return ContainerReplicaBuilder
+     */
+    public ContainerReplicaBuilder setContainerID(
+        final ContainerID containerId) {
+      containerID = containerId;
+      return this;
+    }
+
+    /**
+     * Set DatanodeDetails.
+     *
+     * @param datanodeDetails DatanodeDetails
+     * @return ContainerReplicaBuilder
+     */
+    public ContainerReplicaBuilder setDatanodeDetails(
+        DatanodeDetails datanodeDetails) {
+      datanode = datanodeDetails;
+      return this;
+    }
+
+    /**
+     * Set replica origin node id.
+     *
+     * @param originNodeId origin node UUID
+     * @return ContainerReplicaBuilder
+     */
+    public ContainerReplicaBuilder setOriginNodeId(UUID originNodeId) {
+      placeOfBirth = originNodeId;
+      return this;
+    }
+
+    /**
+     * Set sequence Id of the replica.
+     *
+     * @param seqId container sequence Id
+     * @return ContainerReplicaBuilder
+     */
+    public ContainerReplicaBuilder setSequenceId(long seqId) {
+      sequenceId = seqId;
+      return this;
+    }
+
+    /**
+     * Constructs new ContainerReplicaBuilder.
+     *
+     * @return ContainerReplicaBuilder
+     */
+    public ContainerReplica build() {
+      Preconditions.checkNotNull(containerID,
+          "Container Id can't be null");
+      Preconditions.checkNotNull(datanode,
+          "DatanodeDetails can't be null");
+      ContainerReplica replica = new ContainerReplica(containerID, datanode,
+          Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()));
+      Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId);
+      return replica;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 0f824a0..5885d959 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -24,11 +24,9 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
@@ -49,11 +47,7 @@ public class ContainerReportHandler implements
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
   private final NodeManager nodeManager;
-
   private final ContainerManager containerManager;
-
-  private ContainerStateManager containerStateManager;
-
   private ReplicationActivityStatus replicationStatus;
 
   public ContainerReportHandler(ContainerManager containerManager,
@@ -62,7 +56,6 @@ public class ContainerReportHandler implements
     Preconditions.checkNotNull(containerManager);
     Preconditions.checkNotNull(nodeManager);
     Preconditions.checkNotNull(replicationActivityStatus);
-    this.containerStateManager = containerManager.getStateManager();
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationStatus = replicationActivityStatus;
@@ -81,7 +74,7 @@ public class ContainerReportHandler implements
 
       //update state in container db and trigger close container events
       containerManager
-          .processContainerReports(datanodeOrigin, containerReport, false);
+          .processContainerReports(datanodeOrigin, containerReport);
 
       Set<ContainerID> containerIds = containerReport.getReportsList().stream()
           .map(StorageContainerDatanodeProtocolProtos
@@ -97,13 +90,21 @@ public class ContainerReportHandler implements
           .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
 
       for (ContainerID containerID : reportResult.getMissingEntries()) {
-        containerStateManager
-            .removeContainerReplica(containerID, datanodeOrigin);
+        final ContainerReplica replica = ContainerReplica.newBuilder()
+            .setContainerID(containerID)
+            .setDatanodeDetails(datanodeOrigin)
+            .build();
+        containerManager
+            .removeContainerReplica(containerID, replica);
         checkReplicationState(containerID, publisher);
       }
 
       for (ContainerID containerID : reportResult.getNewEntries()) {
-        containerStateManager.addContainerReplica(containerID, datanodeOrigin);
+        final ContainerReplica replica = ContainerReplica.newBuilder()
+            .setContainerID(containerID)
+            .setDatanodeDetails(datanodeOrigin)
+            .build();
+        containerManager.updateContainerReplica(containerID, replica);
         checkReplicationState(containerID, publisher);
       }
 
@@ -116,35 +117,30 @@ public class ContainerReportHandler implements
   }
 
   private void checkReplicationState(ContainerID containerID,
-      EventPublisher publisher)
-      throws SCMException {
-    ContainerInfo container = containerStateManager.getContainer(containerID);
-
-    if (container == null) {
-      //warning unknown container
+      EventPublisher publisher) {
+    try {
+      ContainerInfo container = containerManager.getContainer(containerID);
+      replicateIfNeeded(container, publisher);
+    } catch (ContainerNotFoundException ex) {
       LOG.warn(
           "Container is missing from containerStateManager. Can't request "
               + "replication. {}",
           containerID);
-      return;
-    }
-    if (container.isContainerOpen()) {
-      return;
     }
 
-    ReplicationRequest replicationState =
-        containerStateManager.checkReplicationState(containerID);
-    if (replicationState != null) {
-      if (replicationStatus.isReplicationEnabled()) {
+  }
+
+  private void replicateIfNeeded(ContainerInfo container,
+      EventPublisher publisher) throws ContainerNotFoundException {
+    if (!container.isOpen() && replicationStatus.isReplicationEnabled()) {
+      final int existingReplicas = containerManager
+          .getContainerReplicas(container.containerID()).size();
+      final int expectedReplicas = container.getReplicationFactor().getNumber();
+      if (existingReplicas != expectedReplicas) {
         publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-            replicationState);
-      } else {
-        LOG.warn(
-            "Over/under replicated container but the replication is not "
-                + "(yet) enabled: "
-                + replicationState.toString());
+            new ReplicationRequest(container.getContainerID(),
+                existingReplicas, expectedReplicas));
       }
     }
-
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message