hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject [11/50] [abbrv] hadoop git commit: HDDS-694. Plugin new Pipeline management code in SCM. Contributed by Lokesh Jain.
Date Wed, 31 Oct 2018 18:23:22 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index fc9afd6..d8b9958 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -63,9 +63,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -149,6 +150,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * State Managers of SCM.
    */
   private final NodeManager scmNodeManager;
+  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
   private final BlockManager scmBlockManager;
   private final SCMStorage scmStorage;
@@ -201,8 +203,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     scmNodeManager = new SCMNodeManager(
         conf, scmStorage.getClusterID(), this, eventQueue);
+    pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     containerManager = new SCMContainerManager(
-        conf, scmNodeManager, eventQueue);
+        conf, scmNodeManager, pipelineManager, eventQueue);
     scmBlockManager = new BlockManagerImpl(
         conf, scmNodeManager, containerManager, eventQueue);
 
@@ -213,14 +216,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
     PipelineReportHandler pipelineReportHandler =
-            new PipelineReportHandler(
-                    containerManager.getPipelineSelector());
+            new PipelineReportHandler(pipelineManager, conf);
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager);
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(containerManager.getPipelineSelector());
+        new StaleNodeHandler(scmNodeManager, pipelineManager);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
         containerManager);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -231,11 +233,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new ContainerReportHandler(containerManager, scmNodeManager,
             replicationStatus);
 
-    PipelineActionEventHandler pipelineActionEventHandler =
-        new PipelineActionEventHandler();
-
-    PipelineCloseHandler pipelineCloseHandler =
-        new PipelineCloseHandler(containerManager.getPipelineSelector());
+    PipelineActionHandler pipelineActionHandler =
+        new PipelineActionHandler(pipelineManager);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -294,10 +293,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
     eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
         (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
-    eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
-        pipelineActionEventHandler);
-    eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
-
+    eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
 
@@ -771,6 +767,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       LOG.error("SCM Event Queue stop failed", ex);
     }
     IOUtils.cleanupWithLogger(LOG, containerManager);
+    IOUtils.cleanupWithLogger(LOG, pipelineManager);
   }
 
   /**
@@ -815,6 +812,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmNodeManager;
   }
 
+  /**
+   * Returns pipeline manager.
+   *
+   * @return - Pipeline Manager
+   */
+  @VisibleForTesting
+  public PipelineManager getPipelineManager() {
+    return pipelineManager;
+  }
+
   @VisibleForTesting
   public BlockManager getScmBlockManager() {
     return scmBlockManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a9c6906..32e8640 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -58,6 +60,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
 public class TestBlockManager implements EventHandler<Boolean> {
   private static SCMContainerManager mapping;
   private static MockNodeManager nodeManager;
+  private static PipelineManager pipelineManager;
   private static BlockManagerImpl blockManager;
   private static File testDir;
   private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
@@ -83,7 +86,10 @@ public class TestBlockManager implements EventHandler<Boolean> {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 10);
-    mapping = new SCMContainerManager(conf, nodeManager, eventQueue);
+    pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    mapping = new SCMContainerManager(conf, nodeManager, pipelineManager,
+        eventQueue);
     blockManager = new BlockManagerImpl(conf,
         nodeManager, mapping, eventQueue);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
@@ -101,6 +107,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
   @After
   public void cleanup() throws IOException {
     blockManager.close();
+    pipelineManager.close();
     mapping.close();
     FileUtil.fullyDelete(testDir);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 5b76137..06f4f5e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -24,12 +24,11 @@ import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto
@@ -102,12 +101,13 @@ public class TestDeletedBlockLog {
 
     ContainerInfo containerInfo =
         new ContainerInfo.Builder().setContainerID(1).build();
-    Pipeline pipeline =
-        new Pipeline(null, LifeCycleState.CLOSED,
-            ReplicationType.RATIS, ReplicationFactor.THREE, null);
-    pipeline.addMember(dnList.get(0));
-    pipeline.addMember(dnList.get(1));
-    pipeline.addMember(dnList.get(2));
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setType(ReplicationType.RATIS)
+        .setFactor(ReplicationFactor.THREE)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .setNodes(dnList)
+        .build();
     ContainerWithPipeline containerWithPipeline =
         new ContainerWithPipeline(containerInfo, pipeline);
     when(containerManager.getContainerWithPipeline(anyObject()))
@@ -383,11 +383,15 @@ public class TestDeletedBlockLog {
 
   private void mockContainerInfo(long containerID, DatanodeDetails dd)
       throws IOException {
-    Pipeline pipeline =
-        new Pipeline("fake", LifeCycleState.OPEN,
-            ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
-            PipelineID.randomId());
-    pipeline.addMember(dd);
+    List<DatanodeDetails> dns = new ArrayList<>();
+    dns.add(dd);
+    Pipeline pipeline = Pipeline.newBuilder()
+            .setType(ReplicationType.STAND_ALONE)
+            .setFactor(ReplicationFactor.ONE)
+            .setState(Pipeline.PipelineState.OPEN)
+            .setId(PipelineID.randomId())
+            .setNodes(dns)
+            .build();
 
     ContainerInfo.Builder builder = new ContainerInfo.Builder();
     builder.setPipelineID(pipeline.getId())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index f4ce102..8d36d29 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 517bc67..44a8deb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@@ -65,8 +67,11 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
-    containerManager = new SCMContainerManager(configuration, nodeManager,
-        new EventQueue());
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(configuration, nodeManager, eventQueue);
+    containerManager = new
+        SCMContainerManager(configuration, nodeManager,
+        pipelineManager, new EventQueue());
     eventQueue = new EventQueue();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(containerManager));
@@ -110,11 +115,12 @@ public class TestCloseContainerEventHandler {
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerWithPipeline containerWithPipeline = containerManager
-        .allocateContainer(HddsProtos.ReplicationType.STAND_ALONE,
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, "ozone");
     ContainerID id = new ContainerID(
         containerWithPipeline.getContainerInfo().getContainerID());
-    DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
+    DatanodeDetails datanode =
+        containerWithPipeline.getPipeline().getFirstNode();
     int closeCount = nodeManager.getCommandCount(datanode);
     eventQueue.fireEvent(CLOSE_CONTAINER, id);
     eventQueue.processAll(1000);
@@ -149,13 +155,13 @@ public class TestCloseContainerEventHandler {
     eventQueue.processAll(1000);
     int i = 0;
     for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getMachines()) {
+        .getNodes()) {
       closeCount[i] = nodeManager.getCommandCount(details);
       i++;
     }
     i = 0;
     for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getMachines()) {
+        .getNodes()) {
       Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
       i++;
     }
@@ -166,7 +172,7 @@ public class TestCloseContainerEventHandler {
     i = 0;
     // Make sure close is queued for each datanode on the pipeline
     for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getMachines()) {
+        .getNodes()) {
       Assert.assertEquals(closeCount[i] + 1,
           nodeManager.getCommandCount(details));
       Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 7135173..861d241 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.container.replication
     .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.Event;
@@ -73,8 +75,11 @@ public class TestContainerReportHandler implements EventPublisher {
     //GIVEN
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
+    EventQueue eventQueue = new EventQueue();
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     SCMContainerManager containerManager = new SCMContainerManager(
-        conf, nodeManager, new EventQueue());
+        conf, nodeManager, pipelineManager, eventQueue);
 
     ReplicationActivityStatus replicationActivityStatus =
         new ReplicationActivityStatus();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 69a3b31..446eb58 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Set;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -25,9 +26,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -102,19 +104,20 @@ public class TestContainerStateManager {
 
   private ContainerInfo allocateContainer() throws IOException {
 
-    PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+    PipelineManager pipelineManager = Mockito.mock(SCMPipelineManager.class);
 
-    Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.ReplicationType.STAND_ALONE,
-        HddsProtos.ReplicationFactor.THREE,
-        PipelineID.randomId());
+    Pipeline pipeline =
+        Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED)
+            .setId(PipelineID.randomId())
+            .setType(HddsProtos.ReplicationType.STAND_ALONE)
+            .setFactor(HddsProtos.ReplicationFactor.THREE)
+            .setNodes(new ArrayList<>()).build();
 
-    when(pipelineSelector
-        .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE,
-            HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline);
+    when(pipelineManager.createPipeline(HddsProtos.ReplicationType.STAND_ALONE,
+        HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline);
 
-    return containerStateManager.allocateContainer(
-        pipelineSelector, HddsProtos.ReplicationType.STAND_ALONE,
+    return containerStateManager.allocateContainer(pipelineManager,
+        HddsProtos.ReplicationType.STAND_ALONE,
         HddsProtos.ReplicationFactor.THREE, "root");
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 75f8b8c..fa0f084 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -24,13 +24,15 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@@ -59,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 public class TestSCMContainerManager {
   private static SCMContainerManager containerManager;
   private static MockNodeManager nodeManager;
+  private static PipelineManager pipelineManager;
   private static File testDir;
   private static XceiverClientManager xceiverClientManager;
   private static String containerOwner = "OZONE";
@@ -85,8 +88,10 @@ public class TestSCMContainerManager {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 10);
+    pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     containerManager = new SCMContainerManager(conf, nodeManager,
-        new EventQueue());
+        pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
     random = new Random();
   }
@@ -96,6 +101,9 @@ public class TestSCMContainerManager {
     if(containerManager != null) {
       containerManager.close();
     }
+    if (pipelineManager != null) {
+      pipelineManager.close();
+    }
     FileUtil.fullyDelete(testDir);
   }
 
@@ -130,7 +138,7 @@ public class TestSCMContainerManager {
 
       Assert.assertNotNull(containerInfo);
       Assert.assertNotNull(containerInfo.getPipeline());
-      pipelineList.add(containerInfo.getPipeline().getLeader()
+      pipelineList.add(containerInfo.getPipeline().getFirstNode()
           .getUuid());
     }
     Assert.assertTrue(pipelineList.size() > 5);
@@ -145,8 +153,8 @@ public class TestSCMContainerManager {
     Pipeline pipeline  = containerInfo.getPipeline();
     Assert.assertNotNull(pipeline);
     Pipeline newPipeline = containerInfo.getPipeline();
-    Assert.assertEquals(pipeline.getLeader().getUuid(),
-        newPipeline.getLeader().getUuid());
+    Assert.assertEquals(pipeline.getFirstNode().getUuid(),
+        newPipeline.getFirstNode().getUuid());
   }
 
   @Test
@@ -191,15 +199,15 @@ public class TestSCMContainerManager {
     contInfo = containerManager.getContainer(contInfo.containerID());
     Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED);
     Pipeline pipeline = containerWithPipeline.getPipeline();
-    containerManager.getPipelineSelector().finalizePipeline(pipeline);
+    pipelineManager.finalizePipeline(pipeline.getId());
 
     ContainerWithPipeline containerWithPipeline2 = containerManager
         .getContainerWithPipeline(contInfo.containerID());
     pipeline = containerWithPipeline2.getPipeline();
     Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2);
     Assert.assertNotNull("Pipeline should not be null", pipeline);
-    Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn1.getHostName()));
-    Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn2.getHostName()));
+    Assert.assertTrue(pipeline.getNodes().contains(dn1));
+    Assert.assertTrue(pipeline.getNodes().contains(dn2));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index b0951c8..571a5fb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -20,23 +20,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -227,18 +226,16 @@ public class TestReplicationManager {
   public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
       throws IOException {
     Objects.requireNonNull(ids, "ids == null");
-    final Iterator<DatanodeDetails> i = ids.iterator();
-    Preconditions.checkArgument(i.hasNext());
-    final DatanodeDetails leader = i.next();
-    final Pipeline pipeline =
-        new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
-            ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
-            PipelineID.randomId());
-    pipeline.addMember(leader);
-    while (i.hasNext()) {
-      pipeline.addMember(i.next());
-    }
-    return pipeline;
+    Preconditions.checkArgument(ids.iterator().hasNext());
+    List<DatanodeDetails> dns = new ArrayList<>();
+    ids.forEach(dns::add);
+    return Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(ReplicationFactor.ONE)
+        .setNodes(dns)
+        .build();
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index fb08ad2..e283732 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -102,7 +104,9 @@ public class TestContainerPlacement {
     EventQueue eventQueue = new EventQueue();
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    return new SCMContainerManager(config, scmNodeManager,
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(config, scmNodeManager, eventQueue);
+    return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
   }
@@ -156,7 +160,7 @@ public class TestContainerPlacement {
           xceiverClientManager.getType(),
           xceiverClientManager.getFactor(), "OZONE");
       assertEquals(xceiverClientManager.getFactor().getNumber(),
-          containerWithPipeline.getPipeline().getMachines().size());
+          containerWithPipeline.getPipeline().getNodes().size());
     } finally {
       IOUtils.closeQuietly(containerManager);
       IOUtils.closeQuietly(nodeManager);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index d971e68..985fa2c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -75,7 +77,10 @@ public class TestDeadNodeHandler {
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
     eventQueue = new EventQueue();
     nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue);
-    containerManager = new SCMContainerManager(conf, nodeManager, eventQueue);
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    containerManager = new SCMContainerManager(conf, nodeManager,
+        pipelineManager, eventQueue);
     deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     publisher = Mockito.mock(EventPublisher.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index ed95709..c899bda 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 390746f..b2ddf39 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.statemachine

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 069f1af..fbc3420 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -20,8 +20,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
index 0135df3..bf37718 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
@@ -82,10 +82,7 @@ public class InfoSubcommand implements Callable<Void> {
       LOG.info("Container Metadata: {}", metadataStr);
 
       // Print pipeline of an existing container.
-      LOG.info("LeaderID: {}", container.getPipeline()
-          .getLeader().getHostName());
-      String machinesStr = container.getPipeline()
-          .getMachines().stream().map(
+      String machinesStr = container.getPipeline().getNodes().stream().map(
               DatanodeDetails::getHostName).collect(Collectors.joining(","));
       LOG.info("Datanodes: [{}]", machinesStr);
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index 3772c59..0c09fc8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -282,7 +282,10 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
 
       // irrespective of the container state, we will always read via Standalone
       // protocol.
-      pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE);
+      if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+        pipeline = Pipeline.newBuilder(pipeline)
+            .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
+      }
       XceiverClientSpi xceiverClient = xceiverClientManager
           .acquireClient(pipeline);
       boolean success = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index 7a0fa5c..74cbea4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -24,9 +24,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.After;
@@ -50,7 +47,7 @@ public class TestNode2PipelineMap {
   private static StorageContainerManager scm;
   private static ContainerWithPipeline ratisContainer;
   private static ContainerManager containerManager;
-  private static PipelineSelector pipelineSelector;
+  private static PipelineManager pipelineManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -66,7 +63,7 @@ public class TestNode2PipelineMap {
     containerManager = scm.getContainerManager();
     ratisContainer = containerManager.allocateContainer(
         RATIS, THREE, "testOwner");
-    pipelineSelector = containerManager.getPipelineSelector();
+    pipelineManager = scm.getPipelineManager();
   }
 
   /**
@@ -83,15 +80,15 @@ public class TestNode2PipelineMap {
   @Test
   public void testPipelineMap() throws IOException {
 
-    Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
-        ratisContainer.getPipeline().getId());
+    Set<ContainerID> set = pipelineManager
+        .getContainersInPipeline(ratisContainer.getPipeline().getId());
 
     ContainerID cId = ratisContainer.getContainerInfo().containerID();
     Assert.assertEquals(1, set.size());
     set.forEach(containerID ->
             Assert.assertEquals(containerID, cId));
 
-    List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
+    List<DatanodeDetails> dns = ratisContainer.getPipeline().getNodes();
     Assert.assertEquals(3, dns.size());
 
     // get pipeline details by dnid
@@ -112,18 +109,14 @@ public class TestNode2PipelineMap {
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
     containerManager
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
-    Set<ContainerID> set2 = pipelineSelector.getOpenContainerIDsByPipeline(
+    Set<ContainerID> set2 = pipelineManager.getContainersInPipeline(
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, set2.size());
 
-    try {
-      pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
-          HddsProtos.LifeCycleEvent.CLOSE);
-      Assert.fail("closing of pipeline without finalize should fail");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof SCMException);
-      Assert.assertEquals(((SCMException)e).getResult(),
-          SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
+    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
+    pipelineManager.removePipeline(ratisContainer.getPipeline().getId());
+    pipelines = scm.getScmNodeManager()
+        .getPipelineByDnID(dns.get(0).getUuid());
+    Assert.assertEquals(0, pipelines.size());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
index f3e1ece..45886c6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -20,12 +20,10 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -53,6 +51,7 @@ public class TestNodeFailure {
   private static ContainerWithPipeline ratisContainer1;
   private static ContainerWithPipeline ratisContainer2;
   private static ContainerManager containerManager;
+  private static PipelineManager pipelineManager;
   private static long timeForFailure;
 
   /**
@@ -76,6 +75,7 @@ public class TestNodeFailure {
     cluster.waitForClusterToBeReady();
     StorageContainerManager scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
+    pipelineManager = scm.getPipelineManager();
     ratisContainer1 = containerManager.allocateContainer(
         RATIS, THREE, "testOwner");
     ratisContainer2 = containerManager.allocateContainer(
@@ -102,21 +102,21 @@ public class TestNodeFailure {
   @Test
   public void testPipelineFail() throws InterruptedException, IOException,
       TimeoutException {
-    Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
-        HddsProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(ratisContainer1.getPipeline().getPipelineState(),
+        Pipeline.PipelineState.OPEN);
     Pipeline pipelineToFail = ratisContainer1.getPipeline();
-    DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0);
+    DatanodeDetails dnToFail = pipelineToFail.getFirstNode();
     cluster.shutdownHddsDatanode(dnToFail);
 
     // wait for sufficient time for the callback to be triggered
     Thread.sleep(3 * timeForFailure);
 
-    Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
-        ratisContainer1.getPipeline().getLifeCycleState());
-    Assert.assertEquals(HddsProtos.LifeCycleState.OPEN,
-        ratisContainer2.getPipeline().getLifeCycleState());
-    Assert.assertNull(containerManager.getPipelineSelector()
-        .getPipeline(pipelineToFail.getId()));
+    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
+        pipelineManager.getPipeline(ratisContainer1.getPipeline().getId())
+            .getPipelineState());
+    Assert.assertEquals(Pipeline.PipelineState.OPEN,
+        pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
+            .getPipelineState());
     // Now restart the datanode and make sure that a new pipeline is created.
     cluster.restartHddsDatanode(dnToFail);
     ContainerWithPipeline ratisContainer3 =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 52a493d..42d3063 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
@@ -50,7 +48,7 @@ public class TestPipelineClose {
   private static ContainerWithPipeline ratisContainer1;
   private static ContainerWithPipeline ratisContainer2;
   private static ContainerManager containerManager;
-  private static PipelineSelector pipelineSelector;
+  private static PipelineManager pipelineManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -68,7 +66,7 @@ public class TestPipelineClose {
         .allocateContainer(RATIS, THREE, "testOwner");
     ratisContainer2 = containerManager
         .allocateContainer(RATIS, THREE, "testOwner");
-    pipelineSelector = containerManager.getPipelineSelector();
+    pipelineManager = scm.getPipelineManager();
     // At this stage, there should be 2 pipeline one with 1 open container each.
     // Try closing the both the pipelines, one with a closed container and
     // the other with an open container.
@@ -87,8 +85,8 @@ public class TestPipelineClose {
 
   @Test
   public void testPipelineCloseWithClosedContainer() throws IOException {
-    Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
-        ratisContainer1.getPipeline().getId());
+    Set<ContainerID> set = pipelineManager
+        .getContainersInPipeline(ratisContainer1.getPipeline().getId());
 
     ContainerID cId = ratisContainer1.getContainerInfo().containerID();
     Assert.assertEquals(1, set.size());
@@ -105,17 +103,17 @@ public class TestPipelineClose {
     containerManager
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
 
-    Set<ContainerID> setClosed = pipelineSelector.getOpenContainerIDsByPipeline(
-        ratisContainer1.getPipeline().getId());
+    Set<ContainerID> setClosed = pipelineManager
+        .getContainersInPipeline(ratisContainer1.getPipeline().getId());
     Assert.assertEquals(0, setClosed.size());
 
-    pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
-    Pipeline pipeline1 = pipelineSelector
+    pipelineManager.finalizePipeline(ratisContainer1.getPipeline().getId());
+    Pipeline pipeline1 = pipelineManager
         .getPipeline(ratisContainer1.getPipeline().getId());
-    Assert.assertNull(pipeline1);
-    Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
-        HddsProtos.LifeCycleState.CLOSED);
-    for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
+    Assert.assertEquals(pipeline1.getPipelineState(),
+        Pipeline.PipelineState.CLOSED);
+    pipelineManager.removePipeline(pipeline1.getId());
+    for (DatanodeDetails dn : ratisContainer1.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID(
           dn.getUuid()).size(), 0);
@@ -125,7 +123,7 @@ public class TestPipelineClose {
   @Test
   public void testPipelineCloseWithOpenContainer() throws IOException,
       TimeoutException, InterruptedException {
-    Set<ContainerID> setOpen = pipelineSelector.getOpenContainerIDsByPipeline(
+    Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline(
         ratisContainer2.getPipeline().getId());
     Assert.assertEquals(1, setOpen.size());
 
@@ -134,12 +132,13 @@ public class TestPipelineClose {
         .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE);
     containerManager
         .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED);
-    pipelineSelector.finalizePipeline(ratisContainer2.getPipeline());
-    Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
-        HddsProtos.LifeCycleState.CLOSING);
-    Pipeline pipeline2 = pipelineSelector
+    pipelineManager.finalizePipeline(ratisContainer2.getPipeline().getId());
+    Assert.assertEquals(
+        pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
+            .getPipelineState(), Pipeline.PipelineState.CLOSED);
+    Pipeline pipeline2 = pipelineManager
         .getPipeline(ratisContainer2.getPipeline().getId());
-    Assert.assertEquals(pipeline2.getLifeCycleState(),
-        HddsProtos.LifeCycleState.CLOSING);
+    Assert.assertEquals(pipeline2.getPipelineState(),
+        Pipeline.PipelineState.CLOSED);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 49fb2bc..fd6f76b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -91,7 +91,7 @@ public class TestPipelineStateManager {
     }
 
     // verify pipeline returned is same
-    Pipeline pipeline1 = stateManager.getPipeline(pipeline.getID());
+    Pipeline pipeline1 = stateManager.getPipeline(pipeline.getId());
     Assert.assertTrue(pipeline == pipeline1);
 
     // clean up
@@ -102,15 +102,17 @@ public class TestPipelineStateManager {
   public void testGetPipelines() throws IOException {
     Set<Pipeline> pipelines = new HashSet<>();
     Pipeline pipeline = createDummyPipeline(1);
-    pipelines.add(pipeline);
     stateManager.addPipeline(pipeline);
-    pipeline = createDummyPipeline(1);
+    stateManager.openPipeline(pipeline.getId());
     pipelines.add(pipeline);
+    pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
+    stateManager.openPipeline(pipeline.getId());
+    pipelines.add(pipeline);
 
-    Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelinesByType(
+    Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
         HddsProtos.ReplicationType.RATIS));
-    Assert.assertEquals(pipelines, pipelines1);
+    Assert.assertEquals(pipelines1.size(), pipelines.size());
     // clean up
     for (Pipeline pipeline1 : pipelines) {
       removePipeline(pipeline1);
@@ -131,16 +133,16 @@ public class TestPipelineStateManager {
           stateManager.addPipeline(pipeline);
           pipelines.add(pipeline);
 
-          // 5 pipelines in allocated state for each type and factor
+          // 5 pipelines in open state for each type and factor
           pipeline = createDummyPipeline(type, factor, factor.getNumber());
           stateManager.addPipeline(pipeline);
-          stateManager.openPipeline(pipeline.getID());
+          stateManager.openPipeline(pipeline.getId());
           pipelines.add(pipeline);
 
-          // 5 pipelines in allocated state for each type and factor
+          // 5 pipelines in closed state for each type and factor
           pipeline = createDummyPipeline(type, factor, factor.getNumber());
           stateManager.addPipeline(pipeline);
-          stateManager.finalizePipeline(pipeline.getID());
+          stateManager.finalizePipeline(pipeline.getId());
           pipelines.add(pipeline);
         }
       }
@@ -152,8 +154,8 @@ public class TestPipelineStateManager {
           .values()) {
         // verify pipelines received
         List<Pipeline> pipelines1 =
-            stateManager.getPipelinesByTypeAndFactor(type, factor);
-        Assert.assertEquals(5, pipelines1.size());
+            stateManager.getPipelines(type, factor);
+        Assert.assertEquals(15, pipelines1.size());
         pipelines1.stream().forEach(p -> {
           Assert.assertEquals(p.getType(), type);
           Assert.assertEquals(p.getFactor(), factor);
@@ -168,40 +170,79 @@ public class TestPipelineStateManager {
   }
 
   @Test
+  public void testGetPipelinesByTypeAndState() throws IOException {
+    Set<Pipeline> pipelines = new HashSet<>();
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+      for (int i = 0; i < 5; i++) {
+        // 5 pipelines in allocated state for each type and factor
+        Pipeline pipeline =
+            createDummyPipeline(type, factor, factor.getNumber());
+        stateManager.addPipeline(pipeline);
+        pipelines.add(pipeline);
+
+        // 5 pipelines in open state for each type and factor
+        pipeline = createDummyPipeline(type, factor, factor.getNumber());
+        stateManager.addPipeline(pipeline);
+        stateManager.openPipeline(pipeline.getId());
+        pipelines.add(pipeline);
+
+        // 5 pipelines in closed state for each type and factor
+        pipeline = createDummyPipeline(type, factor, factor.getNumber());
+        stateManager.addPipeline(pipeline);
+        stateManager.finalizePipeline(pipeline.getId());
+        pipelines.add(pipeline);
+      }
+    }
+
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      // verify pipelines received
+      List<Pipeline> pipelines1 = stateManager
+          .getPipelines(type, Pipeline.PipelineState.OPEN);
+      Assert.assertEquals(5, pipelines1.size());
+      pipelines1.forEach(p -> {
+        Assert.assertEquals(p.getType(), type);
+        Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN);
+      });
+
+      pipelines1 = stateManager
+          .getPipelines(type, Pipeline.PipelineState.OPEN,
+              Pipeline.PipelineState.CLOSED, Pipeline.PipelineState.ALLOCATED);
+      Assert.assertEquals(15, pipelines1.size());
+    }
+
+    //clean up
+    for (Pipeline pipeline : pipelines) {
+      removePipeline(pipeline);
+    }
+  }
+
+  @Test
   public void testAddAndGetContainer() throws IOException {
     long containerID = 0;
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    pipeline = stateManager.getPipeline(pipeline.getID());
-
-    try {
-      stateManager.addContainerToPipeline(pipeline.getID(),
-          ContainerID.valueof(++containerID));
-      Assert.fail("Container should not have been added");
-    } catch (IOException e) {
-      // add container possible only in container with open state
-      Assert.assertTrue(e.getMessage().contains("is not in open state"));
-    }
+    pipeline = stateManager.getPipeline(pipeline.getId());
+    stateManager.addContainerToPipeline(pipeline.getId(),
+        ContainerID.valueof(++containerID));
 
     // move pipeline to open state
-    stateManager.openPipeline(pipeline.getID());
-
-    // add three containers
-    stateManager.addContainerToPipeline(pipeline.getID(),
-        ContainerID.valueof(containerID));
-    stateManager.addContainerToPipeline(pipeline.getID(),
+    stateManager.openPipeline(pipeline.getId());
+    stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueof(++containerID));
-    stateManager.addContainerToPipeline(pipeline.getID(),
+    stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueof(++containerID));
 
     //verify the number of containers returned
     Set<ContainerID> containerIDs =
-        stateManager.getContainers(pipeline.getID());
+        stateManager.getContainers(pipeline.getId());
     Assert.assertEquals(containerIDs.size(), containerID);
 
     removePipeline(pipeline);
     try {
-      stateManager.addContainerToPipeline(pipeline.getID(),
+      stateManager.addContainerToPipeline(pipeline.getId(),
           ContainerID.valueof(++containerID));
       Assert.fail("Container should not have been added");
     } catch (IOException e) {
@@ -215,12 +256,12 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
     // close the pipeline
-    stateManager.openPipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
     stateManager
-        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
+        .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
 
     try {
-      stateManager.removePipeline(pipeline.getID());
+      stateManager.removePipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been removed");
     } catch (IOException e) {
       // can not remove a pipeline which already has containers
@@ -228,10 +269,10 @@ public class TestPipelineStateManager {
     }
 
     // close the pipeline
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
 
     try {
-      stateManager.removePipeline(pipeline.getID());
+      stateManager.removePipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been removed");
     } catch (IOException e) {
       // can not remove a pipeline which already has containers
@@ -248,33 +289,33 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(1);
     // create an open pipeline in stateMap
     stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
 
-    stateManager.addContainerToPipeline(pipeline.getID(),
+    stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueof(containerID));
-    Assert.assertEquals(1, stateManager.getContainers(pipeline.getID()).size());
-    stateManager.removeContainerFromPipeline(pipeline.getID(),
+    Assert.assertEquals(1, stateManager.getContainers(pipeline.getId()).size());
+    stateManager.removeContainerFromPipeline(pipeline.getId(),
         ContainerID.valueof(containerID));
-    Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size());
+    Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size());
 
     // add two containers in the pipeline
-    stateManager.addContainerToPipeline(pipeline.getID(),
+    stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueof(++containerID));
-    stateManager.addContainerToPipeline(pipeline.getID(),
+    stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueof(++containerID));
-    Assert.assertEquals(2, stateManager.getContainers(pipeline.getID()).size());
+    Assert.assertEquals(2, stateManager.getContainers(pipeline.getId()).size());
 
     // move pipeline to closing state
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
 
-    stateManager.removeContainerFromPipeline(pipeline.getID(),
+    stateManager.removeContainerFromPipeline(pipeline.getId(),
         ContainerID.valueof(containerID));
-    stateManager.removeContainerFromPipeline(pipeline.getID(),
+    stateManager.removeContainerFromPipeline(pipeline.getId(),
         ContainerID.valueof(--containerID));
-    Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size());
+    Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size());
 
     // clean up
-    stateManager.removePipeline(pipeline.getID());
+    stateManager.removePipeline(pipeline.getId());
   }
 
   @Test
@@ -282,30 +323,30 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
     // finalize on ALLOCATED pipeline
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+        stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
     removePipeline(pipeline);
 
     pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
     // finalize on OPEN pipeline
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+        stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
     removePipeline(pipeline);
 
     pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getID());
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
+    stateManager.finalizePipeline(pipeline.getId());
     // finalize should work on already closed pipeline
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+        stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
     removePipeline(pipeline);
   }
@@ -315,25 +356,25 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
     // open on ALLOCATED pipeline
-    stateManager.openPipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.OPEN,
-        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+        stateManager.getPipeline(pipeline.getId()).getPipelineState());
 
-    stateManager.openPipeline(pipeline.getID());
+    stateManager.openPipeline(pipeline.getId());
     // open should work on already open pipeline
     Assert.assertEquals(Pipeline.PipelineState.OPEN,
-        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+        stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
     removePipeline(pipeline);
   }
 
   private void removePipeline(Pipeline pipeline) throws IOException {
-    stateManager.finalizePipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getId());
     Set<ContainerID> containerIDs =
-        stateManager.getContainers(pipeline.getID());
+        stateManager.getContainers(pipeline.getId());
     for (ContainerID containerID : containerIDs) {
-      stateManager.removeContainerFromPipeline(pipeline.getID(), containerID);
+      stateManager.removeContainerFromPipeline(pipeline.getId(), containerID);
     }
-    stateManager.removePipeline(pipeline.getID());
+    stateManager.removePipeline(pipeline.getId());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 184143a..0025c2e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -58,7 +58,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -71,7 +71,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -86,19 +86,20 @@ public class TestRatisPipelineProvider {
   @Test
   public void testCreatePipelineWithNodes() throws IOException {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
-    Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Pipeline pipeline =
+        provider.create(factor, createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(
-        pipeline.getPipelineState(), Pipeline.PipelineState.ALLOCATED);
+        pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
-    pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 0f9ad55..dab7fb6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.container.TestSCMContainerManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -54,7 +53,7 @@ public class TestSCMPipelineManager {
   public static void setUp() throws Exception {
     conf = new OzoneConfiguration();
     testDir = GenericTestUtils
-        .getTestDir(TestSCMContainerManager.class.getSimpleName());
+        .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
@@ -83,16 +82,18 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager,
-            new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    for (Pipeline p : pipelines) {
+      pipelineManager.openPipeline(p.getId());
+    }
     List<Pipeline> pipelineList =
-        pipelineManager.getPipelinesByType(HddsProtos.ReplicationType.RATIS);
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
 
     // clean up
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizePipeline(pipeline.getID());
-      pipelineManager.removePipeline(pipeline.getID());
+      pipelineManager.finalizePipeline(pipeline.getId());
+      pipelineManager.removePipeline(pipeline.getId());
     }
     pipelineManager.close();
   }
@@ -104,13 +105,13 @@ public class TestSCMPipelineManager {
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
-    pipelineManager.openPipeline(pipeline.getID());
+    pipelineManager.openPipeline(pipeline.getId());
     pipelineManager
-        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
-    pipelineManager.finalizePipeline(pipeline.getID());
+        .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
+    pipelineManager.finalizePipeline(pipeline.getId());
     pipelineManager
-        .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(1));
-    pipelineManager.removePipeline(pipeline.getID());
+        .removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(1));
+    pipelineManager.removePipeline(pipeline.getId());
     pipelineManager.close();
 
     // new pipeline manager should not be able to load removed pipelines
@@ -118,7 +119,7 @@ public class TestSCMPipelineManager {
         new SCMPipelineManager(conf, nodeManager,
             new EventQueue());
     try {
-      pipelineManager.getPipeline(pipeline.getID());
+      pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
     } catch (IOException e) {
       Assert.assertTrue(e.getMessage().contains("not found"));
@@ -138,36 +139,36 @@ public class TestSCMPipelineManager {
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
     Assert
-        .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
     Assert
-        .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isOpen());
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // get pipeline report from each dn in the pipeline
     PipelineReportHandler pipelineReportHandler =
         new PipelineReportHandler(pipelineManager, conf);
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
+          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId());
       // pipeline is not healthy until all dns report
       Assert.assertFalse(
-          pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+          pipelineManager.getPipeline(pipeline.getId()).isHealthy());
       pipelineReportHandler
           .onMessage(pipelineReportFromDatanode, new EventQueue());
     }
 
     // pipeline is healthy when all dns report
     Assert
-        .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
     // pipeline should now move to open state
     Assert
-        .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isOpen());
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // close the pipeline
-    pipelineManager.finalizePipeline(pipeline.getID());
+    pipelineManager.finalizePipeline(pipeline.getId());
 
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
+          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId());
       // pipeline report for a closed pipeline should destroy the pipeline
       // and remove it from the pipeline manager
       pipelineReportHandler
@@ -175,7 +176,7 @@ public class TestSCMPipelineManager {
     }
 
     try {
-      pipelineManager.getPipeline(pipeline.getID());
+      pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
     } catch (IOException e) {
       Assert.assertTrue(e.getMessage().contains("not found"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index bac4022..0fa8649 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
@@ -48,6 +46,7 @@ public class TestSCMRestart {
   private static Pipeline ratisPipeline2;
   private static ContainerManager containerManager;
   private static ContainerManager newContainerManager;
+  private static PipelineManager pipelineManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -65,6 +64,7 @@ public class TestSCMRestart {
     cluster.waitForClusterToBeReady();
     StorageContainerManager scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
+    pipelineManager = scm.getPipelineManager();
     ratisPipeline1 = containerManager.allocateContainer(
         RATIS, THREE, "Owner1").getPipeline();
     ratisPipeline2 = containerManager.allocateContainer(
@@ -75,6 +75,7 @@ public class TestSCMRestart {
     cluster.restartStorageContainerManager();
     newContainerManager = cluster.getStorageContainerManager()
         .getContainerManager();
+    pipelineManager = cluster.getStorageContainerManager().getPipelineManager();
   }
 
   /**
@@ -90,25 +91,15 @@ public class TestSCMRestart {
   @Test
   public void testPipelineWithScmRestart() throws IOException {
     // After restart make sure that the pipeline are still present
-    Pipeline ratisPipeline1AfterRestart = newContainerManager
-        .getPipelineSelector().getPipeline(ratisPipeline1.getId());
-    Pipeline ratisPipeline2AfterRestart = newContainerManager
-        .getPipelineSelector().getPipeline(ratisPipeline2.getId());
+    Pipeline ratisPipeline1AfterRestart =
+        pipelineManager.getPipeline(ratisPipeline1.getId());
+    Pipeline ratisPipeline2AfterRestart =
+        pipelineManager.getPipeline(ratisPipeline2.getId());
     Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1);
     Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
     Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
     Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
 
-    for (DatanodeDetails dn : ratisPipeline1.getMachines()) {
-      Assert.assertEquals(dn, ratisPipeline1AfterRestart.getDatanodes()
-              .get(dn.getUuidString()));
-    }
-
-    for (DatanodeDetails dn : ratisPipeline2.getMachines()) {
-      Assert.assertEquals(dn, ratisPipeline2AfterRestart.getDatanodes()
-              .get(dn.getUuidString()));
-    }
-
     // Try creating a new ratis pipeline, it should be from the same pipeline
     // as was before restart
     Pipeline newRatisPipeline = newContainerManager

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index b44dbef..22fd95b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -57,7 +57,7 @@ public class TestSimplePipelineProvider {
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -67,7 +67,7 @@ public class TestSimplePipelineProvider {
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -82,21 +82,22 @@ public class TestSimplePipelineProvider {
   @Test
   public void testCreatePipelineWithNodes() throws IOException {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
-    Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Pipeline pipeline =
+        provider.create(factor, createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
-    pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.ALLOCATED);
+        Pipeline.PipelineState.OPEN);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index a83c16e..871f389 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index c69a94c..78a8511 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.
     ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index bf6a189..e616eef 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.ozone;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
@@ -29,7 +29,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.TestGenericTestUtils;
 import org.junit.AfterClass;
@@ -40,6 +40,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 
@@ -91,18 +92,20 @@ public class TestMiniOzoneCluster {
     assertEquals(numberOfNodes, datanodes.size());
     for(HddsDatanodeService dn : datanodes) {
       // Create a single member pipe line
-      DatanodeDetails datanodeDetails = dn.getDatanodeDetails();
-      final Pipeline pipeline =
-          new Pipeline(datanodeDetails.getUuidString(),
-              HddsProtos.LifeCycleState.OPEN,
-              HddsProtos.ReplicationType.STAND_ALONE,
-              HddsProtos.ReplicationFactor.ONE, PipelineID.randomId());
-      pipeline.addMember(datanodeDetails);
+      List<DatanodeDetails> dns = new ArrayList<>();
+      dns.add(dn.getDatanodeDetails());
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setState(Pipeline.PipelineState.OPEN)
+          .setId(PipelineID.randomId())
+          .setType(HddsProtos.ReplicationType.STAND_ALONE)
+          .setFactor(HddsProtos.ReplicationFactor.ONE)
+          .setNodes(dns)
+          .build();
 
       // Verify client is able to connect to the container
       try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
         client.connect();
-        assertTrue(client.isConnected(pipeline.getLeader()));
+        assertTrue(client.isConnected(pipeline.getFirstNode()));
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message