hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viraj...@apache.org
Subject [12/50] [abbrv] hadoop git commit: HDDS-175. Refactor ContainerInfo to remove Pipeline object from it. Contributed by Ajay Kumar.
Date Mon, 09 Jul 2018 18:26:04 GMT
HDDS-175. Refactor ContainerInfo to remove Pipeline object from it.
Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ca4f0ce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ca4f0ce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ca4f0ce

Branch: refs/heads/HDFS-12090
Commit: 7ca4f0cefa220c752920822c8d16469ab3b09b37
Parents: 93ac01c
Author: Anu Engineer <aengineer@apache.org>
Authored: Tue Jul 3 13:30:19 2018 -0700
Committer: Anu Engineer <aengineer@apache.org>
Committed: Tue Jul 3 14:11:52 2018 -0700

----------------------------------------------------------------------
 .../scm/client/ContainerOperationClient.java    | 109 +++++++++---
 .../hadoop/hdds/scm/client/ScmClient.java       |  38 ++++-
 .../container/common/helpers/ContainerInfo.java | 167 +++++++++++++------
 .../common/helpers/ContainerWithPipeline.java   | 131 +++++++++++++++
 .../StorageContainerLocationProtocol.java       |  13 +-
 ...rLocationProtocolClientSideTranslatorPB.java |  26 ++-
 ...rLocationProtocolServerSideTranslatorPB.java |  25 ++-
 .../StorageContainerLocationProtocol.proto      |  15 +-
 hadoop-hdds/common/src/main/proto/hdds.proto    |   9 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |  80 +++++----
 .../block/DatanodeDeletedBlockTransactions.java |  11 +-
 .../container/CloseContainerEventHandler.java   |  26 ++-
 .../hdds/scm/container/ContainerMapping.java    | 128 +++++++++++---
 .../scm/container/ContainerStateManager.java    |  30 +++-
 .../hadoop/hdds/scm/container/Mapping.java      |  26 ++-
 .../scm/container/closer/ContainerCloser.java   |  15 +-
 .../scm/container/states/ContainerStateMap.java |   7 +-
 .../hdds/scm/pipelines/PipelineManager.java     |  27 ++-
 .../hdds/scm/pipelines/PipelineSelector.java    |  16 ++
 .../scm/pipelines/ratis/RatisManagerImpl.java   |   1 +
 .../standalone/StandaloneManagerImpl.java       |   1 +
 .../scm/server/SCMClientProtocolServer.java     |  14 +-
 .../hdds/scm/block/TestDeletedBlockLog.java     |  15 +-
 .../TestCloseContainerEventHandler.java         |  31 ++--
 .../scm/container/TestContainerMapping.java     |  27 +--
 .../container/closer/TestContainerCloser.java   |  18 +-
 .../hdds/scm/node/TestContainerPlacement.java   |   6 +-
 .../cli/container/CloseContainerHandler.java    |  10 +-
 .../cli/container/DeleteContainerHandler.java   |   9 +-
 .../scm/cli/container/InfoContainerHandler.java |  11 +-
 .../ozone/client/io/ChunkGroupInputStream.java  |  15 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   9 +-
 .../hadoop/ozone/protocolPB/OzonePBHelper.java  |  30 ++++
 .../container/TestContainerStateManager.java    | 161 ++++++++++--------
 .../hadoop/ozone/TestContainerOperations.java   |  11 +-
 .../ozone/TestStorageContainerManager.java      |   6 +-
 .../TestStorageContainerManagerHelper.java      |  10 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    |   4 +-
 .../TestCloseContainerByPipeline.java           |  21 +--
 .../ozone/ksm/TestContainerReportWithKeys.java  |   2 +-
 .../hadoop/ozone/scm/TestAllocateContainer.java |   6 +-
 .../ozone/scm/TestContainerSmallFile.java       |  36 ++--
 .../org/apache/hadoop/ozone/scm/TestSCMCli.java | 135 ++++++++-------
 .../ozone/scm/TestXceiverClientManager.java     |  62 ++++---
 .../ozone/scm/TestXceiverClientMetrics.java     |  14 +-
 .../genesis/BenchMarkContainerStateMap.java     |  16 +-
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |  63 +++----
 47 files changed, 1139 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 07f6cec..b04f8c4 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.client;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocolPB
@@ -87,16 +88,17 @@ public class ContainerOperationClient implements ScmClient {
    * @inheritDoc
    */
   @Override
-  public ContainerInfo createContainer(String owner)
+  public ContainerWithPipeline createContainer(String owner)
       throws IOException {
     XceiverClientSpi client = null;
     try {
-      ContainerInfo container =
+      ContainerWithPipeline containerWithPipeline =
           storageContainerLocationClient.allocateContainer(
               xceiverClientManager.getType(),
               xceiverClientManager.getFactor(), owner);
-      Pipeline pipeline = container.getPipeline();
-      client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());
+      Pipeline pipeline = containerWithPipeline.getPipeline();
+      client = xceiverClientManager.acquireClient(pipeline,
+          containerWithPipeline.getContainerInfo().getContainerID());
 
       // Allocated State means that SCM has allocated this pipeline in its
       // namespace. The client needs to create the pipeline on the machines
@@ -106,8 +108,9 @@ public class ContainerOperationClient implements ScmClient {
       if (pipeline.getLifeCycleState() == ALLOCATED) {
         createPipeline(client, pipeline);
       }
-      createContainer(client, container.getContainerID());
-      return container;
+      createContainer(client,
+          containerWithPipeline.getContainerInfo().getContainerID());
+      return containerWithPipeline;
     } finally {
       if (client != null) {
         xceiverClientManager.releaseClient(client);
@@ -197,17 +200,17 @@ public class ContainerOperationClient implements ScmClient {
    * @inheritDoc
    */
   @Override
-  public ContainerInfo createContainer(HddsProtos.ReplicationType type,
+  public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor, String owner) throws IOException {
     XceiverClientSpi client = null;
     try {
       // allocate container on SCM.
-      ContainerInfo container =
+      ContainerWithPipeline containerWithPipeline =
           storageContainerLocationClient.allocateContainer(type, factor,
               owner);
-      Pipeline pipeline = container.getPipeline();
+      Pipeline pipeline = containerWithPipeline.getPipeline();
       client = xceiverClientManager.acquireClient(pipeline,
-          container.getContainerID());
+          containerWithPipeline.getContainerInfo().getContainerID());
 
       // Allocated State means that SCM has allocated this pipeline in its
       // namespace. The client needs to create the pipeline on the machines
@@ -217,9 +220,10 @@ public class ContainerOperationClient implements ScmClient {
       }
       // connect to pipeline leader and allocate container on leader datanode.
       client = xceiverClientManager.acquireClient(pipeline,
-          container.getContainerID());
-      createContainer(client, container.getContainerID());
-      return container;
+          containerWithPipeline.getContainerInfo().getContainerID());
+      createContainer(client,
+          containerWithPipeline.getContainerInfo().getContainerID());
+      return containerWithPipeline;
     } finally {
       if (client != null) {
         xceiverClientManager.releaseClient(client);
@@ -256,24 +260,27 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
-   * Delete the container, this will release any resource it uses.
-   * @param pipeline - Pipeline that represents the container.
-   * @param force - True to forcibly delete the container.
+   * Deletes an existing container.
+   *
+   * @param containerId - ID of the container.
+   * @param pipeline    - Pipeline that represents the container.
+   * @param force       - true to forcibly delete the container.
    * @throws IOException
    */
   @Override
-  public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
-      throws IOException {
+  public void deleteContainer(long containerId, Pipeline pipeline,
+      boolean force) throws IOException {
     XceiverClientSpi client = null;
     try {
-      client = xceiverClientManager.acquireClient(pipeline, containerID);
+      client = xceiverClientManager.acquireClient(pipeline, containerId);
       String traceID = UUID.randomUUID().toString();
-      ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
+      ContainerProtocolCalls
+          .deleteContainer(client, containerId, force, traceID);
       storageContainerLocationClient
-          .deleteContainer(containerID);
+          .deleteContainer(containerId);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Deleted container {}, leader: {}, machines: {} ",
-            containerID,
+            containerId,
             pipeline.getLeader(),
             pipeline.getMachines());
       }
@@ -285,6 +292,19 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
+   * Delete the container, this will release any resource it uses.
+   * @param containerID - containerID.
+   * @param force - True to forcibly delete the container.
+   * @throws IOException
+   */
+  @Override
+  public void deleteContainer(long containerID, boolean force)
+      throws IOException {
+    ContainerWithPipeline info = getContainerWithPipeline(containerID);
+    deleteContainer(containerID, info.getPipeline(), force);
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -297,9 +317,9 @@ public class ContainerOperationClient implements ScmClient {
   /**
    * Get meta data from an existing container.
    *
-   * @param pipeline - pipeline that represents the container.
-   * @return ContainerInfo - a message of protobuf which has basic info
-   * of a container.
+   * @param containerID - ID of the container.
+   * @param pipeline    - Pipeline where the container is located.
+   * @return ContainerInfo
    * @throws IOException
    */
   @Override
@@ -326,6 +346,19 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
+   * Get meta data from an existing container.
+   * @param containerID - ID of the container.
+   * @return ContainerInfo - a message of protobuf which has basic info
+   * of a container.
+   * @throws IOException
+   */
+  @Override
+  public ContainerData readContainer(long containerID) throws IOException {
+    ContainerWithPipeline info = getContainerWithPipeline(containerID);
+    return readContainer(containerID, info.getPipeline());
+  }
+
+  /**
    * Given an id, return the pipeline associated with the container.
    * @param containerId - String Container ID
    * @return Pipeline of the existing container, corresponding to the given id.
@@ -338,6 +371,19 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
+   * Gets a container by Name -- Throws if the container does not exist.
+   *
+   * @param containerId - Container ID
+   * @return ContainerWithPipeline
+   * @throws IOException
+   */
+  @Override
+  public ContainerWithPipeline getContainerWithPipeline(long containerId)
+      throws IOException {
+    return storageContainerLocationClient.getContainerWithPipeline(containerId);
+  }
+
+  /**
    * Close a container.
    *
    * @param pipeline the container to be closed.
@@ -392,6 +438,19 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
+   * Close a container.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void closeContainer(long containerId)
+      throws IOException {
+    ContainerWithPipeline info = getContainerWithPipeline(containerId);
+    Pipeline pipeline = info.getPipeline();
+    closeContainer(containerId, pipeline);
+  }
+
+  /**
    * Get the the current usage information.
    * @param containerID - ID of the container.
    * @return the size of the given container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index b52819a..ecb2173 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.client;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -45,7 +46,7 @@ public interface ScmClient {
    * @return ContainerInfo
    * @throws IOException
    */
-  ContainerInfo createContainer(String owner) throws IOException;
+  ContainerWithPipeline createContainer(String owner) throws IOException;
 
   /**
    * Gets a container by Name -- Throws if the container does not exist.
@@ -56,6 +57,14 @@ public interface ScmClient {
   ContainerInfo getContainer(long containerId) throws IOException;
 
   /**
+   * Gets a container by Name -- Throws if the container does not exist.
+   * @param containerId - Container ID
+   * @return ContainerWithPipeline
+   * @throws IOException
+   */
+  ContainerWithPipeline getContainerWithPipeline(long containerId) throws IOException;
+
+  /**
    * Close a container.
    *
    * @param containerId - ID of the container.
@@ -65,6 +74,14 @@ public interface ScmClient {
   void closeContainer(long containerId, Pipeline pipeline) throws IOException;
 
   /**
+   * Close a container.
+   *
+   * @param containerId - ID of the container.
+   * @throws IOException
+   */
+  void closeContainer(long containerId) throws IOException;
+
+  /**
    * Deletes an existing container.
    * @param containerId - ID of the container.
    * @param pipeline - Pipeline that represents the container.
@@ -74,6 +91,14 @@ public interface ScmClient {
   void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException;
 
   /**
+   * Deletes an existing container.
+   * @param containerId - ID of the container.
+   * @param force - true to forcibly delete the container.
+   * @throws IOException
+   */
+  void deleteContainer(long containerId, boolean force) throws IOException;
+
+  /**
    * Lists a range of containers and get their info.
    *
    * @param startContainerID start containerID.
@@ -96,6 +121,15 @@ public interface ScmClient {
       throws IOException;
 
   /**
+   * Read meta data from an existing container.
+   * @param containerID - ID of the container.
+   * @return ContainerInfo
+   * @throws IOException
+   */
+  ContainerData readContainer(long containerID)
+      throws IOException;
+
+  /**
    * Gets the container size -- Computed by SCM from Container Reports.
    * @param containerID - ID of the container.
    * @return number of bytes used by this container.
@@ -110,7 +144,7 @@ public interface ScmClient {
    * @return ContainerInfo
    * @throws IOException - in case of error.
    */
-  ContainerInfo createContainer(HddsProtos.ReplicationType type,
+  ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor replicationFactor,
       String owner) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index ee05c87..9593717 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -15,34 +15,39 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package org.apache.hadoop.hdds.scm.container.common.helpers;
 
+import static java.lang.Math.max;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Comparator;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.util.Time;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import static java.lang.Math.max;
-
 /**
  * Class wraps ozone container info.
  */
-public class ContainerInfo
-    implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
+public class ContainerInfo implements Comparator<ContainerInfo>,
+    Comparable<ContainerInfo>, Externalizable {
 
   private static final ObjectWriter WRITER;
+  private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
+      + " supported. Use protobuf instead.";
 
   static {
     ObjectMapper mapper = new ObjectMapper();
@@ -53,7 +58,9 @@ public class ContainerInfo
   }
 
   private HddsProtos.LifeCycleState state;
-  private Pipeline pipeline;
+  private String pipelineName;
+  private ReplicationFactor replicationFactor;
+  private ReplicationType replicationType;
   // Bytes allocated by SCM for clients.
   private long allocatedBytes;
   // Actual container usage, updated through heartbeat.
@@ -75,15 +82,17 @@ public class ContainerInfo
   ContainerInfo(
       long containerID,
       HddsProtos.LifeCycleState state,
-      Pipeline pipeline,
+      String pipelineName,
       long allocatedBytes,
       long usedBytes,
       long numberOfKeys,
       long stateEnterTime,
       String owner,
-      long deleteTransactionId) {
+      long deleteTransactionId,
+      ReplicationFactor replicationFactor,
+      ReplicationType repType) {
     this.containerID = containerID;
-    this.pipeline = pipeline;
+    this.pipelineName = pipelineName;
     this.allocatedBytes = allocatedBytes;
     this.usedBytes = usedBytes;
     this.numberOfKeys = numberOfKeys;
@@ -92,6 +101,8 @@ public class ContainerInfo
     this.stateEnterTime = stateEnterTime;
     this.owner = owner;
     this.deleteTransactionId = deleteTransactionId;
+    this.replicationFactor = replicationFactor;
+    this.replicationType = repType;
   }
 
   /**
@@ -102,16 +113,18 @@ public class ContainerInfo
 
   public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
     ContainerInfo.Builder builder = new ContainerInfo.Builder();
-    builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
-    builder.setAllocatedBytes(info.getAllocatedBytes());
-    builder.setUsedBytes(info.getUsedBytes());
-    builder.setNumberOfKeys(info.getNumberOfKeys());
-    builder.setState(info.getState());
-    builder.setStateEnterTime(info.getStateEnterTime());
-    builder.setOwner(info.getOwner());
-    builder.setContainerID(info.getContainerID());
-    builder.setDeleteTransactionId(info.getDeleteTransactionId());
-    return builder.build();
+    return builder.setPipelineName(info.getPipelineName())
+        .setAllocatedBytes(info.getAllocatedBytes())
+        .setUsedBytes(info.getUsedBytes())
+        .setNumberOfKeys(info.getNumberOfKeys())
+        .setState(info.getState())
+        .setStateEnterTime(info.getStateEnterTime())
+        .setOwner(info.getOwner())
+        .setContainerID(info.getContainerID())
+        .setDeleteTransactionId(info.getDeleteTransactionId())
+        .setReplicationFactor(info.getReplicationFactor())
+        .setReplicationType(info.getReplicationType())
+        .build();
   }
 
   public long getContainerID() {
@@ -130,8 +143,12 @@ public class ContainerInfo
     return stateEnterTime;
   }
 
-  public Pipeline getPipeline() {
-    return pipeline;
+  public ReplicationFactor getReplicationFactor() {
+    return replicationFactor;
+  }
+
+  public String getPipelineName() {
+    return pipelineName;
   }
 
   public long getAllocatedBytes() {
@@ -177,6 +194,10 @@ public class ContainerInfo
     return lastUsed;
   }
 
+  public ReplicationType getReplicationType() {
+    return replicationType;
+  }
+
   public void updateLastUsedTime() {
     lastUsed = Time.monotonicNow();
   }
@@ -190,19 +211,17 @@ public class ContainerInfo
   public HddsProtos.SCMContainerInfo getProtobuf() {
     HddsProtos.SCMContainerInfo.Builder builder =
         HddsProtos.SCMContainerInfo.newBuilder();
-    builder.setPipeline(getPipeline().getProtobufMessage());
-    builder.setAllocatedBytes(getAllocatedBytes());
-    builder.setUsedBytes(getUsedBytes());
-    builder.setNumberOfKeys(getNumberOfKeys());
-    builder.setState(state);
-    builder.setStateEnterTime(stateEnterTime);
-    builder.setContainerID(getContainerID());
-    builder.setDeleteTransactionId(deleteTransactionId);
-
-    if (getOwner() != null) {
-      builder.setOwner(getOwner());
-    }
-    return builder.build();
+    return builder.setAllocatedBytes(getAllocatedBytes())
+        .setContainerID(getContainerID())
+        .setUsedBytes(getUsedBytes())
+        .setNumberOfKeys(getNumberOfKeys()).setState(getState())
+        .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
+        .setDeleteTransactionId(getDeleteTransactionId())
+        .setPipelineName(getPipelineName())
+        .setReplicationFactor(getReplicationFactor())
+        .setReplicationType(getReplicationType())
+        .setOwner(getOwner())
+        .build();
   }
 
   public String getOwner() {
@@ -217,7 +236,7 @@ public class ContainerInfo
   public String toString() {
     return "ContainerInfo{"
         + "state=" + state
-        + ", pipeline=" + pipeline
+        + ", pipelineName=" + pipelineName
         + ", stateEnterTime=" + stateEnterTime
         + ", owner=" + owner
         + '}';
@@ -252,9 +271,7 @@ public class ContainerInfo
   public int hashCode() {
     return new HashCodeBuilder(11, 811)
         .append(getContainerID())
-        .append(pipeline.getFactor())
-        .append(pipeline.getType())
-        .append(owner)
+        .append(getOwner())
         .toHashCode();
   }
 
@@ -327,12 +344,44 @@ public class ContainerInfo
       this.data = Arrays.copyOf(data, data.length);
     }
   }
+
+  /**
+   * Throws IOException as default java serialization is not supported. Use
+   * serialization via protobuf instead.
+   *
+   * @param out the stream to write the object to
+   * @throws IOException Includes any I/O exceptions that may occur
+   * @serialData Overriding methods should use this tag to describe
+   * the data layout of this Externalizable object.
+   * List the sequence of element types and, if possible,
+   * relate the element to a public/protected field and/or
+   * method of this Externalizable class.
+   */
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    throw new IOException(SERIALIZATION_ERROR_MSG);
+  }
+
+  /**
+   * Throws IOException as default java serialization is not supported. Use
+   * serialization via protobuf instead.
+   *
+   * @param in the stream to read data from in order to restore the object
+   * @throws IOException            if I/O errors occur
+   * @throws ClassNotFoundException If the class for an object being
+   *                                restored cannot be found.
+   */
+  @Override
+  public void readExternal(ObjectInput in)
+      throws IOException, ClassNotFoundException {
+    throw new IOException(SERIALIZATION_ERROR_MSG);
+  }
+
   /**
    * Builder class for ContainerInfo.
    */
   public static class Builder {
     private HddsProtos.LifeCycleState state;
-    private Pipeline pipeline;
     private long allocated;
     private long used;
     private long keys;
@@ -340,6 +389,25 @@ public class ContainerInfo
     private String owner;
     private long containerID;
     private long deleteTransactionId;
+    private String pipelineName;
+    private ReplicationFactor replicationFactor;
+    private ReplicationType replicationType;
+
+    public Builder setReplicationType(
+        ReplicationType replicationType) {
+      this.replicationType = replicationType;
+      return this;
+    }
+
+    public Builder setPipelineName(String pipelineName) {
+      this.pipelineName = pipelineName;
+      return this;
+    }
+
+    public Builder setReplicationFactor(ReplicationFactor repFactor) {
+      this.replicationFactor = repFactor;
+      return this;
+    }
 
     public Builder setContainerID(long id) {
       Preconditions.checkState(id >= 0);
@@ -352,11 +420,6 @@ public class ContainerInfo
       return this;
     }
 
-    public Builder setPipeline(Pipeline containerPipeline) {
-      this.pipeline = containerPipeline;
-      return this;
-    }
-
     public Builder setAllocatedBytes(long bytesAllocated) {
       this.allocated = bytesAllocated;
       return this;
@@ -388,9 +451,9 @@ public class ContainerInfo
     }
 
     public ContainerInfo build() {
-      return new
-          ContainerInfo(containerID, state, pipeline, allocated,
-              used, keys, stateEnterTime, owner, deleteTransactionId);
+      return new ContainerInfo(containerID, state, pipelineName, allocated,
+              used, keys, stateEnterTime, owner, deleteTransactionId,
+          replicationFactor, replicationType);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
new file mode 100644
index 0000000..e71d429
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import java.util.Comparator;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerWithPipeline
+    implements Comparator<ContainerWithPipeline>, Comparable<ContainerWithPipeline> {
+
+  private final ContainerInfo containerInfo;
+  private final Pipeline pipeline;
+
+  public ContainerWithPipeline(ContainerInfo containerInfo, Pipeline pipeline) {
+    this.containerInfo = containerInfo;
+    this.pipeline = pipeline;
+  }
+
+  public ContainerInfo getContainerInfo() {
+    return containerInfo;
+  }
+
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  public static ContainerWithPipeline fromProtobuf(HddsProtos.ContainerWithPipeline allocatedContainer) {
+    return new ContainerWithPipeline(
+        ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
+        Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
+  }
+
+  public HddsProtos.ContainerWithPipeline getProtobuf() {
+    HddsProtos.ContainerWithPipeline.Builder builder =
+        HddsProtos.ContainerWithPipeline.newBuilder();
+    builder.setContainerInfo(getContainerInfo().getProtobuf())
+        .setPipeline(getPipeline().getProtobufMessage());
+
+    return builder.build();
+  }
+
+
+  @Override
+  public String toString() {
+    return containerInfo.toString() + " | " + pipeline.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ContainerWithPipeline that = (ContainerWithPipeline) o;
+
+    return new EqualsBuilder()
+        .append(getContainerInfo(), that.getContainerInfo())
+        .append(getPipeline(), that.getPipeline())
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(11, 811)
+        .append(getContainerInfo())
+        .append(getPipeline())
+        .toHashCode();
+  }
+
+  /**
+   * Compares its two arguments for order.  Returns a negative integer, zero, or
+   * a positive integer as the first argument is less than, equal to, or greater
+   * than the second.<p>
+   *
+   * @param o1 the first object to be compared.
+   * @param o2 the second object to be compared.
+   * @return a negative integer, zero, or a positive integer as the first
+   * argument is less than, equal to, or greater than the second.
+   * @throws NullPointerException if an argument is null and this comparator
+   *                              does not permit null arguments
+   * @throws ClassCastException   if the arguments' types prevent them from
+   *                              being compared by this comparator.
+   */
+  @Override
+  public int compare(ContainerWithPipeline o1, ContainerWithPipeline o2) {
+    return o1.getContainerInfo().compareTo(o2.getContainerInfo());
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less than,
+   * equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(ContainerWithPipeline o) {
+    return this.compare(this, o);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index e8d85e0..b787409 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.protocol;
 
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -38,7 +39,7 @@ public interface StorageContainerLocationProtocol {
    * set of datanodes that should be used creating this container.
    *
    */
-  ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
+  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType,
       HddsProtos.ReplicationFactor factor, String owner)
       throws IOException;
 
@@ -54,6 +55,16 @@ public interface StorageContainerLocationProtocol {
   ContainerInfo getContainer(long containerID) throws IOException;
 
   /**
+   * Ask SCM the location of the container. SCM responds with a group of
+   * nodes where this container and its replicas are located.
+   *
+   * @param containerID - ID of the container.
+   * @return ContainerWithPipeline - the container info with the pipeline.
+   * @throws IOException
+   */
+  ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException;
+
+  /**
    * Ask SCM a list of containers with a range of container names
    * and the limit of count.
    * Search container names between start name(exclusive), and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index bba4e17..4b03d12 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -20,7 +20,10 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -95,7 +98,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
    * @throws IOException
    */
   @Override
-  public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+  public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor, String owner) throws IOException {
 
     ContainerRequestProto request = ContainerRequestProto.newBuilder()
@@ -114,7 +117,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
       throw new IOException(response.hasErrorMessage() ?
           response.getErrorMessage() : "Allocate container failed.");
     }
-    return ContainerInfo.fromProtobuf(response.getContainerInfo());
+    return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
   }
 
   public ContainerInfo getContainer(long containerID) throws IOException {
@@ -136,6 +139,25 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   /**
    * {@inheritDoc}
    */
+  public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+    Preconditions.checkState(containerID >= 0,
+        "Container ID cannot be negative");
+    GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto
+        .newBuilder()
+        .setContainerID(containerID)
+        .build();
+    try {
+      GetContainerWithPipelineResponseProto response =
+          rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
+      return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public List<ContainerInfo> listContainer(long startContainerID, int count)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 70a0e8a..d66919f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.ozone.protocolPB;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
@@ -82,10 +85,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   public ContainerResponseProto allocateContainer(RpcController unused,
       ContainerRequestProto request) throws ServiceException {
     try {
-      ContainerInfo container = impl.allocateContainer(request.getReplicationType(),
-          request.getReplicationFactor(), request.getOwner());
+      ContainerWithPipeline containerWithPipeline = impl
+          .allocateContainer(request.getReplicationType(),
+              request.getReplicationFactor(), request.getOwner());
       return ContainerResponseProto.newBuilder()
-          .setContainerInfo(container.getProtobuf())
+          .setContainerWithPipeline(containerWithPipeline.getProtobuf())
           .setErrorCode(ContainerResponseProto.Error.success)
           .build();
 
@@ -109,6 +113,21 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   }
 
   @Override
+  public GetContainerWithPipelineResponseProto getContainerWithPipeline(
+      RpcController controller, GetContainerWithPipelineRequestProto request)
+      throws ServiceException {
+    try {
+      ContainerWithPipeline container = impl
+          .getContainerWithPipeline(request.getContainerID());
+      return GetContainerWithPipelineResponseProto.newBuilder()
+          .setContainerWithPipeline(container.getProtobuf())
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public SCMListContainerResponseProto listContainer(RpcController controller,
       SCMListContainerRequestProto request) throws ServiceException {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 090e6eb..143c2ae 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -52,7 +52,7 @@ message ContainerResponseProto {
     errorContainerMissing = 3;
   }
   required Error errorCode = 1;
-  required SCMContainerInfo containerInfo = 2;
+  required ContainerWithPipeline containerWithPipeline = 2;
   optional string errorMessage = 3;
 }
 
@@ -64,6 +64,14 @@ message GetContainerResponseProto {
   required SCMContainerInfo containerInfo = 1;
 }
 
+message GetContainerWithPipelineRequestProto {
+  required int64 containerID = 1;
+}
+
+message GetContainerWithPipelineResponseProto {
+  required ContainerWithPipeline containerWithPipeline = 1;
+}
+
 message SCMListContainerRequestProto {
   required uint32 count = 1;
   optional uint64 startContainerID = 2;
@@ -171,6 +179,11 @@ service StorageContainerLocationProtocolService {
    */
   rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
 
+  /**
+   * Returns the pipeline for a given container.
+   */
+  rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
+
   rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 816efa7..1c9ee19 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -132,7 +132,7 @@ enum LifeCycleEvent {
 message SCMContainerInfo {
     required int64 containerID = 1;
     required LifeCycleState state = 2;
-    required Pipeline pipeline = 3;
+    optional string pipelineName = 3;
     // This is not total size of container, but space allocated by SCM for
     // clients to write blocks
     required uint64 allocatedBytes = 4;
@@ -141,6 +141,13 @@ message SCMContainerInfo {
     optional int64 stateEnterTime = 7;
     required string owner = 8;
     optional int64 deleteTransactionId = 9;
+    required ReplicationFactor replicationFactor  = 10;
+    required ReplicationType replicationType  = 11;
+}
+
+message ContainerWithPipeline {
+  required SCMContainerInfo containerInfo = 1;
+  required Pipeline pipeline = 2;
 }
 
 message GetScmInfoRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 7cfbdab..953f71e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,10 +16,12 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.Mapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -156,13 +158,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     lock.lock();
     try {
       for (int i = 0; i < count; i++) {
-        ContainerInfo containerInfo = null;
+        ContainerWithPipeline containerWithPipeline = null;
         try {
           // TODO: Fix this later when Ratis is made the Default.
-          containerInfo = containerManager.allocateContainer(type, factor,
+          containerWithPipeline = containerManager.allocateContainer(type, factor,
               owner);
 
-          if (containerInfo == null) {
+          if (containerWithPipeline == null) {
             LOG.warn("Unable to allocate container.");
             continue;
           }
@@ -231,30 +233,27 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
                can use different kind of policies.
       */
 
-      ContainerInfo containerInfo;
+      ContainerWithPipeline containerWithPipeline;
 
       // Look for ALLOCATED container that matches all other parameters.
-      containerInfo =
-          containerManager
-              .getStateManager()
-              .getMatchingContainer(
-                  size, owner, type, factor, HddsProtos.LifeCycleState
-                      .ALLOCATED);
-      if (containerInfo != null) {
-        containerManager.updateContainerState(containerInfo.getContainerID(),
+      containerWithPipeline = containerManager
+          .getMatchingContainerWithPipeline(size, owner, type, factor,
+              HddsProtos.LifeCycleState.ALLOCATED);
+      if (containerWithPipeline != null) {
+        containerManager.updateContainerState(
+            containerWithPipeline.getContainerInfo().getContainerID(),
             HddsProtos.LifeCycleEvent.CREATE);
-        return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+        return newBlock(containerWithPipeline,
+            HddsProtos.LifeCycleState.ALLOCATED);
       }
 
       // Since we found no allocated containers that match our criteria, let us
       // look for OPEN containers that match the criteria.
-      containerInfo =
-          containerManager
-              .getStateManager()
-              .getMatchingContainer(size, owner, type, factor, HddsProtos
-                  .LifeCycleState.OPEN);
-      if (containerInfo != null) {
-        return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN);
+      containerWithPipeline = containerManager
+          .getMatchingContainerWithPipeline(size, owner, type, factor,
+              HddsProtos.LifeCycleState.OPEN);
+      if (containerWithPipeline != null) {
+        return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
       }
 
       // We found neither ALLOCATED or OPEN Containers. This generally means
@@ -264,16 +263,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
 
       // Since we just allocated a set of containers this should work
-      containerInfo =
-          containerManager
-              .getStateManager()
-              .getMatchingContainer(
-                  size, owner, type, factor, HddsProtos.LifeCycleState
-                      .ALLOCATED);
-      if (containerInfo != null) {
-        containerManager.updateContainerState(containerInfo.getContainerID(),
+      containerWithPipeline = containerManager
+          .getMatchingContainerWithPipeline(size, owner, type, factor,
+              HddsProtos.LifeCycleState.ALLOCATED);
+      if (containerWithPipeline != null) {
+        containerManager.updateContainerState(
+            containerWithPipeline.getContainerInfo().getContainerID(),
             HddsProtos.LifeCycleEvent.CREATE);
-        return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+        return newBlock(containerWithPipeline,
+            HddsProtos.LifeCycleState.ALLOCATED);
       }
 
       // we have tried all strategies we know and but somehow we are not able
@@ -290,18 +288,28 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     }
   }
 
+  private String getChannelName(ReplicationType type) {
+    switch (type) {
+      case RATIS:
+        return "RA" + UUID.randomUUID().toString().substring(3);
+      case STAND_ALONE:
+        return "SA" + UUID.randomUUID().toString().substring(3);
+      default:
+        return "RA" + UUID.randomUUID().toString().substring(3);
+    }
+  }
+
   /**
    * newBlock - returns a new block assigned to a container.
    *
-   * @param containerInfo - Container Info.
+   * @param containerWithPipeline - Container Info.
    * @param state - Current state of the container.
    * @return AllocatedBlock
    */
-  private AllocatedBlock newBlock(
-      ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
-      throws IOException {
-
-    if (containerInfo.getPipeline().getMachines().size() == 0) {
+  private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
+      HddsProtos.LifeCycleState state) throws IOException {
+    ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
+    if (containerWithPipeline.getPipeline().getDatanodes().size() == 0) {
       LOG.error("Pipeline Machine count is zero.");
       return null;
     }
@@ -317,7 +325,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     AllocatedBlock.Builder abb =
         new AllocatedBlock.Builder()
             .setBlockID(new BlockID(containerID, localID))
-            .setPipeline(containerInfo.getPipeline())
+            .setPipeline(containerWithPipeline.getPipeline())
             .setShouldCreateContainer(createContainer);
     LOG.trace("New block allocated : {} Container ID: {}", localID,
         containerID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 32290cc..d71e7b0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.block;
 
 import com.google.common.collect.ArrayListMultimap;
 import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -29,6 +28,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 
 /**
  * A wrapper class to hold info about datanode and all deleted block
@@ -54,21 +54,22 @@ public class DatanodeDeletedBlockTransactions {
   }
 
   public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
-    ContainerInfo info = null;
+    Pipeline pipeline = null;
     try {
-      info = mappingService.getContainer(tx.getContainerID());
+      pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
+          .getPipeline();
     } catch (IOException e) {
       SCMBlockDeletingService.LOG.warn("Got container info error.", e);
     }
 
-    if (info == null) {
+    if (pipeline == null) {
       SCMBlockDeletingService.LOG.warn(
           "Container {} not found, continue to process next",
           tx.getContainerID());
       return;
     }
 
-    for (DatanodeDetails dd : info.getPipeline().getMachines()) {
+    for (DatanodeDetails dd : pipeline.getMachines()) {
       UUID dnID = dd.getUuid();
       if (transactions.containsKey(dnID)) {
         List<DeletedBlocksTransaction> txs = transactions.get(dnID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 16e84a3..7b24538 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -16,9 +16,11 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.IOException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,22 +56,32 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
 
     LOG.info("Close container Event triggered for container : {}",
         containerID.getId());
-    ContainerStateManager stateManager = containerManager.getStateManager();
-    ContainerInfo info = stateManager.getContainer(containerID);
-    if (info == null) {
-      LOG.info("Container with id : {} does not exist", containerID.getId());
+    ContainerWithPipeline containerWithPipeline = null;
+    ContainerInfo info;
+    try {
+      containerWithPipeline = containerManager.getContainerWithPipeline(containerID.getId());
+      info = containerWithPipeline.getContainerInfo();
+      if (info == null) {
+        LOG.info("Failed to update the container state. Container with id : {} "
+            + "does not exist", containerID.getId());
+        return;
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to update the container state. Container with id : {} "
+          + "does not exist", containerID.getId());
       return;
     }
+
     if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
-      for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
+      for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) {
         containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
             new CloseContainerCommand(containerID.getId(),
-                info.getPipeline().getType()));
+                info.getReplicationType()));
       }
       try {
         // Finalize event will make sure the state of the container transitions
         // from OPEN to CLOSING in containerStateManager.
-        stateManager
+        containerManager.getStateManager()
             .updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
       } catch (SCMException ex) {
         LOG.error("Failed to update the container state for container : {}"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 9fd30f2..e25c5b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -21,6 +21,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -167,6 +171,44 @@ public class ContainerMapping implements Mapping {
   }
 
   /**
+   * Returns the ContainerInfo from the container ID.
+   *
+   * @param containerID - ID of container.
+   * @return - ContainerWithPipeline such as creation state and the pipeline.
+   * @throws IOException
+   */
+  @Override
+  public ContainerWithPipeline getContainerWithPipeline(long containerID)
+      throws IOException {
+    ContainerInfo contInfo;
+    lock.lock();
+    try {
+      byte[] containerBytes = containerStore.get(
+          Longs.toByteArray(containerID));
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Specified key does not exist. key : " + containerID,
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+          .parseFrom(containerBytes);
+      contInfo = ContainerInfo.fromProtobuf(temp);
+      Pipeline pipeline = pipelineSelector
+          .getPipeline(contInfo.getPipelineName(),
+              contInfo.getReplicationType());
+
+      if(pipeline == null) {
+        pipeline = pipelineSelector
+            .getReplicationPipeline(contInfo.getReplicationType(),
+                contInfo.getReplicationFactor());
+      }
+      return new ContainerWithPipeline(contInfo, pipeline);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -208,13 +250,15 @@ public class ContainerMapping implements Mapping {
    * @throws IOException - Exception
    */
   @Override
-  public ContainerInfo allocateContainer(
+  public ContainerWithPipeline allocateContainer(
       ReplicationType type,
       ReplicationFactor replicationFactor,
       String owner)
       throws IOException {
 
     ContainerInfo containerInfo;
+    ContainerWithPipeline containerWithPipeline;
+
     if (!nodeManager.isOutOfChillMode()) {
       throw new SCMException(
           "Unable to create container while in chill mode",
@@ -223,9 +267,9 @@ public class ContainerMapping implements Mapping {
 
     lock.lock();
     try {
-      containerInfo =
-          containerStateManager.allocateContainer(
+      containerWithPipeline = containerStateManager.allocateContainer(
               pipelineSelector, type, replicationFactor, owner);
+      containerInfo = containerWithPipeline.getContainerInfo();
 
       byte[] containerIDBytes = Longs.toByteArray(
           containerInfo.getContainerID());
@@ -234,7 +278,7 @@ public class ContainerMapping implements Mapping {
     } finally {
       lock.unlock();
     }
-    return containerInfo;
+    return containerWithPipeline;
   }
 
   /**
@@ -381,6 +425,35 @@ public class ContainerMapping implements Mapping {
   }
 
   /**
+   * Return a container matching the attributes specified.
+   *
+   * @param size - Space needed in the Container.
+   * @param owner - Owner of the container - A specific nameservice.
+   * @param type - Replication Type {StandAlone, Ratis}
+   * @param factor - Replication Factor {ONE, THREE}
+   * @param state - State of the Container-- {Open, Allocated etc.}
+   * @return ContainerInfo, null if there is no match found.
+   */
+  public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) throws IOException {
+    ContainerInfo containerInfo = getStateManager()
+        .getMatchingContainer(size, owner, type, factor, state);
+    if (containerInfo == null) {
+      return null;
+    }
+    Pipeline pipeline = pipelineSelector
+        .getPipeline(containerInfo.getPipelineName(),
+            containerInfo.getReplicationType());
+    if (pipeline == null) {
+      pipelineSelector
+          .getReplicationPipeline(containerInfo.getReplicationType(),
+              containerInfo.getReplicationFactor());
+    }
+    return new ContainerWithPipeline(containerInfo, pipeline);
+  }
+
+  /**
    * Process container report from Datanode.
    * <p>
    * Processing follows a very simple logic for time being.
@@ -415,7 +488,7 @@ public class ContainerMapping implements Mapping {
               HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
 
           HddsProtos.SCMContainerInfo newState =
-              reconcileState(datanodeState, knownState);
+              reconcileState(datanodeState, knownState, datanodeDetails);
 
           // FIX ME: This can be optimized, we write twice to memory, where a
           // single write would work well.
@@ -425,8 +498,14 @@ public class ContainerMapping implements Mapping {
           containerStore.put(dbKey, newState.toByteArray());
 
           // If the container is closed, then state is already written to SCM
+          Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType());
+          if(pipeline == null) {
+            pipeline = pipelineSelector
+                .getReplicationPipeline(newState.getReplicationType(),
+                    newState.getReplicationFactor());
+          }
           // DB.TODO: So can we can write only once to DB.
-          if (closeContainerIfNeeded(newState)) {
+          if (closeContainerIfNeeded(newState, pipeline)) {
             LOG.info("Closing the Container: {}", newState.getContainerID());
           }
         } else {
@@ -447,15 +526,22 @@ public class ContainerMapping implements Mapping {
    *
    * @param datanodeState - State from the Datanode.
    * @param knownState - State inside SCM.
+   * @param dnDetails
    * @return new SCM State for this container.
    */
   private HddsProtos.SCMContainerInfo reconcileState(
       StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
-      HddsProtos.SCMContainerInfo knownState) {
+      SCMContainerInfo knownState, DatanodeDetails dnDetails) {
     HddsProtos.SCMContainerInfo.Builder builder =
         HddsProtos.SCMContainerInfo.newBuilder();
-    builder.setContainerID(knownState.getContainerID());
-    builder.setPipeline(knownState.getPipeline());
+    builder.setContainerID(knownState.getContainerID())
+        .setPipelineName(knownState.getPipelineName())
+        .setReplicationType(knownState.getReplicationType())
+        .setReplicationFactor(knownState.getReplicationFactor());
+
+    // TODO: If current state doesn't have this DN in list of DataNodes with replica
+    // then add it in list of replicas.
+
     // If used size is greater than allocated size, we will be updating
     // allocated size with used size. This update is done as a fallback
     // mechanism in case SCM crashes without properly updating allocated
@@ -464,13 +550,13 @@ public class ContainerMapping implements Mapping {
     long usedSize = datanodeState.getUsed();
     long allocated = knownState.getAllocatedBytes() > usedSize ?
         knownState.getAllocatedBytes() : usedSize;
-    builder.setAllocatedBytes(allocated);
-    builder.setUsedBytes(usedSize);
-    builder.setNumberOfKeys(datanodeState.getKeyCount());
-    builder.setState(knownState.getState());
-    builder.setStateEnterTime(knownState.getStateEnterTime());
-    builder.setContainerID(knownState.getContainerID());
-    builder.setDeleteTransactionId(knownState.getDeleteTransactionId());
+    builder.setAllocatedBytes(allocated)
+        .setUsedBytes(usedSize)
+        .setNumberOfKeys(datanodeState.getKeyCount())
+        .setState(knownState.getState())
+        .setStateEnterTime(knownState.getStateEnterTime())
+        .setContainerID(knownState.getContainerID())
+        .setDeleteTransactionId(knownState.getDeleteTransactionId());
     if (knownState.getOwner() != null) {
       builder.setOwner(knownState.getOwner());
     }
@@ -485,9 +571,11 @@ public class ContainerMapping implements Mapping {
    * one protobuf in one file and another definition in another file.
    *
    * @param newState - This is the state we maintain in SCM.
+   * @param pipeline
    * @throws IOException
    */
-  private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
+  private boolean closeContainerIfNeeded(SCMContainerInfo newState,
+      Pipeline pipeline)
       throws IOException {
     float containerUsedPercentage = 1.0f *
         newState.getUsedBytes() / this.size;
@@ -498,7 +586,7 @@ public class ContainerMapping implements Mapping {
       // We will call closer till get to the closed state.
       // That is SCM will make this call repeatedly until we reach the closed
       // state.
-      closer.close(newState);
+      closer.close(newState, pipeline);
 
       if (shouldClose(scmInfo)) {
         // This event moves the Container from Open to Closing State, this is
@@ -598,10 +686,12 @@ public class ContainerMapping implements Mapping {
               .setAllocatedBytes(info.getAllocatedBytes())
               .setNumberOfKeys(oldInfo.getNumberOfKeys())
               .setOwner(oldInfo.getOwner())
-              .setPipeline(oldInfo.getPipeline())
+              .setPipelineName(oldInfo.getPipelineName())
               .setState(oldInfo.getState())
               .setUsedBytes(oldInfo.getUsedBytes())
               .setDeleteTransactionId(oldInfo.getDeleteTransactionId())
+              .setReplicationFactor(oldInfo.getReplicationFactor())
+              .setReplicationType(oldInfo.getReplicationType())
               .build();
           containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
         } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 08733bd..870ab1d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.container;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
@@ -279,10 +280,10 @@ public class ContainerStateManager implements Closeable {
    * @param selector -- Pipeline selector class.
    * @param type -- Replication type.
    * @param replicationFactor - Replication replicationFactor.
-   * @return Container Info.
+   * @return ContainerWithPipeline
    * @throws IOException  on Failure.
    */
-  public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
+  public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsProtos
       .ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
       String owner) throws IOException {
 
@@ -295,7 +296,7 @@ public class ContainerStateManager implements Closeable {
 
     ContainerInfo containerInfo = new ContainerInfo.Builder()
         .setState(HddsProtos.LifeCycleState.ALLOCATED)
-        .setPipeline(pipeline)
+        .setPipelineName(pipeline.getPipelineName())
         // This is bytes allocated for blocks inside container, not the
         // container size
         .setAllocatedBytes(0)
@@ -305,11 +306,13 @@ public class ContainerStateManager implements Closeable {
         .setOwner(owner)
         .setContainerID(containerCount.incrementAndGet())
         .setDeleteTransactionId(0)
+        .setReplicationFactor(replicationFactor)
+        .setReplicationType(pipeline.getType())
         .build();
     Preconditions.checkNotNull(containerInfo);
     containers.addContainer(containerInfo);
     LOG.trace("New container allocated: {}", containerInfo);
-    return containerInfo;
+    return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
   /**
@@ -432,8 +435,8 @@ public class ContainerStateManager implements Closeable {
         containerInfo.updateLastUsedTime();
 
         ContainerState key = new ContainerState(owner,
-            containerInfo.getPipeline().getType(),
-            containerInfo.getPipeline().getFactor());
+            containerInfo.getReplicationType(),
+            containerInfo.getReplicationFactor());
         lastUsedMap.put(key, containerInfo.containerID());
         return containerInfo;
       }
@@ -458,6 +461,20 @@ public class ContainerStateManager implements Closeable {
   }
 
   /**
+   * Returns the containerInfo with pipeline for the given container id.
+   * @param selector -- Pipeline selector class.
+   * @param containerID id of the container
+   * @return ContainerInfo containerInfo
+   * @throws IOException
+   */
+  public ContainerWithPipeline getContainer(PipelineSelector selector,
+      ContainerID containerID) throws IOException {
+    ContainerInfo info = containers.getContainerInfo(containerID.getId());
+    Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
+    return new ContainerWithPipeline(info, pipeline);
+  }
+
+  /**
    * Returns the containerInfo for the given container id.
    * @param containerID id of the container
    * @return ContainerInfo containerInfo
@@ -466,6 +483,7 @@ public class ContainerStateManager implements Closeable {
   public ContainerInfo getContainer(ContainerID containerID) {
     return containers.getContainerInfo(containerID.getId());
   }
+
   @Override
   public void close() throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index e77a4b6..f52eb05 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -17,6 +17,10 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
@@ -43,6 +47,16 @@ public interface Mapping extends Closeable {
   ContainerInfo getContainer(long containerID) throws IOException;
 
   /**
+   * Returns the ContainerInfo from the container ID.
+   *
+   * @param containerID - ID of container.
+   * @return - ContainerWithPipeline such as creation state and the pipeline.
+   * @throws IOException
+   */
+  ContainerWithPipeline getContainerWithPipeline(long containerID)
+      throws IOException;
+
+  /**
    * Returns containers under certain conditions.
    * Search container IDs from start ID(exclusive),
    * The max size of the searching range cannot exceed the
@@ -65,10 +79,10 @@ public interface Mapping extends Closeable {
    *
    * @param replicationFactor - replication factor of the container.
    * @param owner
-   * @return - Container Info.
+   * @return - ContainerWithPipeline.
    * @throws IOException
    */
-  ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor replicationFactor, String owner)
       throws IOException;
 
@@ -120,4 +134,12 @@ public interface Mapping extends Closeable {
    * @return NodeManager
    */
   NodeManager getNodeManager();
+
+  /**
+   * Returns the ContainerWithPipeline.
+   * @return NodeManager
+   */
+  public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
index cbb2ba7..3ca8ba9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -90,8 +92,10 @@ public class ContainerCloser {
    * lives.
    *
    * @param info - ContainerInfo.
+   * @param pipeline
    */
-  public void close(HddsProtos.SCMContainerInfo info) {
+  public void close(SCMContainerInfo info,
+      Pipeline pipeline) {
 
     if (commandIssued.containsKey(info.getContainerID())) {
       // We check if we issued a close command in last 3 * reportInterval secs.
@@ -126,13 +130,10 @@ public class ContainerCloser {
     // this queue can be emptied by a datanode after a close report is send
     // to SCM. In that case also, data node will ignore this command.
 
-    HddsProtos.Pipeline pipeline = info.getPipeline();
-    for (HddsProtos.DatanodeDetailsProto datanodeDetails :
-        pipeline.getMembersList()) {
-      nodeManager.addDatanodeCommand(
-          DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
+    for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
+      nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
           new CloseContainerCommand(info.getContainerID(),
-              pipeline.getType()));
+              info.getReplicationType()));
     }
     if (!commandIssued.containsKey(info.getContainerID())) {
       commandIssued.put(info.getContainerID(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 48c6423..3ada8fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -116,7 +116,8 @@ public class ContainerStateMap {
   public void addContainer(ContainerInfo info)
       throws SCMException {
     Preconditions.checkNotNull(info, "Container Info cannot be null");
-    Preconditions.checkNotNull(info.getPipeline(), "Pipeline cannot be null");
+    Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
+        "ExpectedReplicaCount should be greater than 0");
 
     try (AutoCloseableLock lock = autoLock.acquire()) {
       ContainerID id = ContainerID.valueof(info.getContainerID());
@@ -129,8 +130,8 @@ public class ContainerStateMap {
 
       lifeCycleStateMap.insert(info.getState(), id);
       ownerMap.insert(info.getOwner(), id);
-      factorMap.insert(info.getPipeline().getFactor(), id);
-      typeMap.insert(info.getPipeline().getType(), id);
+      factorMap.insert(info.getReplicationFactor(), id);
+      typeMap.insert(info.getReplicationType(), id);
       LOG.trace("Created container with {} successfully.", id);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 48affa4..a1fbce6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -16,6 +16,9 @@
  */
 package org.apache.hadoop.hdds.scm.pipelines;
 
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -25,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,11 +38,13 @@ public abstract class PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineManager.class);
   private final List<Pipeline> activePipelines;
+  private final Map<String, Pipeline> activePipelineMap;
   private final AtomicInteger pipelineIndex;
 
   public PipelineManager() {
     activePipelines = new LinkedList<>();
     pipelineIndex = new AtomicInteger(0);
+    activePipelineMap = new WeakHashMap<>();
   }
 
   /**
@@ -76,6 +80,7 @@ public abstract class PipelineManager {
               "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
       activePipelines.add(pipeline);
+      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
     } else {
       pipeline =
           findOpenPipeline(replicationType, replicationFactor);
@@ -94,6 +99,26 @@ public abstract class PipelineManager {
     }
   }
 
+  /**
+   * This function to get pipeline with given pipeline name.
+   *
+   * @param pipelineName
+   * @return a Pipeline.
+   */
+  public synchronized final Pipeline getPipeline(String pipelineName) {
+    Pipeline pipeline = null;
+
+    // 1. Check if pipeline channel already exists
+    if (activePipelineMap.containsKey(pipelineName)) {
+      pipeline = activePipelineMap.get(pipelineName);
+      LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
+      return pipeline;
+    } else {
+      LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
+    }
+    return pipeline;
+  }
+
   protected int getReplicationCount(ReplicationFactor factor) {
     switch (factor) {
     case ONE:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 508ca9b..3846a84 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
@@ -177,6 +178,21 @@ public class PipelineSelector {
   }
 
   /**
+   * This function to return pipeline for given pipeline name and replication
+   * type.
+   */
+  public Pipeline getPipeline(String pipelineName,
+      ReplicationType replicationType) throws IOException {
+    if (pipelineName == null) {
+      return null;
+    }
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Getting replication pipeline forReplicationType {} :" +
+        " pipelineName:{}", replicationType, pipelineName);
+    return manager.getPipeline(pipelineName);
+  }
+  /**
    * Creates a pipeline from a specified set of Nodes.
    */
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index ace8758..189060e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines.ratis;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index e76027f..579a3a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm.pipelines.standalone;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d73cccd..e1d478f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -145,11 +146,12 @@ public class SCMClientProtocolServer implements
   }
 
   @Override
-  public ContainerInfo allocateContainer(HddsProtos.ReplicationType
+  public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
       replicationType, HddsProtos.ReplicationFactor factor,
       String owner) throws IOException {
     String remoteUser = getRpcRemoteUsername();
     getScm().checkAdminAccess(remoteUser);
+
     return scm.getScmContainerManager()
         .allocateContainer(replicationType, factor, owner);
   }
@@ -163,6 +165,14 @@ public class SCMClientProtocolServer implements
   }
 
   @Override
+  public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+    String remoteUser = getRpcRemoteUsername();
+    getScm().checkAdminAccess(remoteUser);
+    return scm.getScmContainerManager()
+        .getContainerWithPipeline(containerID);
+  }
+
+  @Override
   public List<ContainerInfo> listContainer(long startContainerID,
       int count) throws IOException {
     return scm.getScmContainerManager().
@@ -248,7 +258,7 @@ public class SCMClientProtocolServer implements
       throws IOException {
     // TODO: will be addressed in future patch.
     // This is needed only for debugging purposes to make sure cluster is
-    // working correctly. 
+    // working correctly.
     return null;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message