hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tmarqua...@apache.org
Subject [10/50] [abbrv] hadoop git commit: HDDS-399. Persist open pipeline information across SCM restart. Contributed by Mukul Kumar Singh.
Date Mon, 17 Sep 2018 22:01:32 GMT
HDDS-399. Persist open pipeline information across SCM restart. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84693669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84693669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84693669

Branch: refs/heads/HADOOP-15407
Commit: 846936698b2c8c50662e43534ac999df82066a8b
Parents: 9a265fa
Author: Nanda kumar <nanda@apache.org>
Authored: Mon Sep 17 21:51:54 2018 +0530
Committer: Nanda kumar <nanda@apache.org>
Committed: Mon Sep 17 21:51:54 2018 +0530

----------------------------------------------------------------------
 .../scm/container/common/helpers/Pipeline.java  |  24 ++
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   2 +
 .../hdds/scm/container/ContainerMapping.java    |  24 +-
 .../scm/container/ContainerStateManager.java    |  25 +-
 .../scm/container/states/ContainerStateMap.java |  38 ---
 .../hdds/scm/pipelines/PipelineManager.java     | 148 +++++------
 .../hdds/scm/pipelines/PipelineSelector.java    | 249 +++++++++----------
 .../scm/pipelines/PipelineStateManager.java     | 136 ++++++++++
 .../scm/pipelines/ratis/RatisManagerImpl.java   |   8 +-
 .../standalone/StandaloneManagerImpl.java       |   8 +-
 .../container/TestContainerReportHandler.java   |   3 +-
 .../container/TestContainerStateManager.java    |   4 +-
 .../hdds/scm/node/TestDeadNodeHandler.java      |   4 +-
 .../TestContainerStateManagerIntegration.java   |  10 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  22 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |  15 +-
 .../hdds/scm/pipeline/TestSCMRestart.java       | 101 ++++++++
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |   5 +-
 .../hadoop/ozone/MiniOzoneClusterImpl.java      |   8 +-
 19 files changed, 510 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 6757262..ef148e5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -86,6 +86,30 @@ public class Pipeline {
     datanodes = new TreeMap<>();
   }
 
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Pipeline that = (Pipeline) o;
+
+    return id.equals(that.id)
+            && factor.equals(that.factor)
+            && type.equals(that.type)
+            && lifeCycleState.equals(that.lifeCycleState)
+            && leaderID.equals(that.leaderID);
+
+  }
+
   /**
    * Gets pipeline object from protobuf.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index bf4508b..0a15ec8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -90,7 +90,9 @@ public final class OzoneConsts {
    * level DB names used by SCM and data nodes.
    */
   public static final String CONTAINER_DB_SUFFIX = "container.db";
+  public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
   public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
+  public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX;
   public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
   public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String OM_DB_NAME = "om.db";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 5678205..11cc9ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -130,12 +130,13 @@ public class ContainerMapping implements Mapping {
 
     size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
         OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-    this.containerStateManager =
-        new ContainerStateManager(conf, this);
-    LOG.trace("Container State Manager created.");
 
     this.pipelineSelector = new PipelineSelector(nodeManager,
-        containerStateManager, conf, eventPublisher);
+            conf, eventPublisher, cacheSizeMB);
+
+    this.containerStateManager =
+        new ContainerStateManager(conf, this, pipelineSelector);
+    LOG.trace("Container State Manager created.");
 
     this.eventPublisher = eventPublisher;
 
@@ -202,11 +203,6 @@ public class ContainerMapping implements Mapping {
       if (contInfo.isContainerOpen()) {
         // If pipeline with given pipeline Id already exist return it
         pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
-        if (pipeline == null) {
-          pipeline = pipelineSelector
-              .getReplicationPipeline(contInfo.getReplicationType(),
-                  contInfo.getReplicationFactor());
-        }
       } else {
         // For close containers create pipeline from datanodes with replicas
         Set<DatanodeDetails> dnWithReplicas = containerStateManager
@@ -392,9 +388,8 @@ public class ContainerMapping implements Mapping {
       ContainerInfo updatedContainer = containerStateManager
           .updateContainerState(containerInfo, event);
       if (!updatedContainer.isContainerOpen()) {
-        Pipeline pipeline = pipelineSelector
-            .getPipeline(containerInfo.getPipelineID());
-        pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
+        pipelineSelector.removeContainerFromPipeline(
+                containerInfo.getPipelineID(), containerID);
       }
       containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
       return updatedContainer.getState();
@@ -474,11 +469,6 @@ public class ContainerMapping implements Mapping {
     }
     Pipeline pipeline = pipelineSelector
         .getPipeline(containerInfo.getPipelineID());
-    if (pipeline == null) {
-      pipeline = pipelineSelector
-          .getReplicationPipeline(containerInfo.getReplicationType(),
-              containerInfo.getReplicationFactor());
-    }
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7989c55..930c098 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
@@ -137,7 +136,7 @@ public class ContainerStateManager implements Closeable {
    */
   @SuppressWarnings("unchecked")
   public ContainerStateManager(Configuration configuration,
-      Mapping containerMapping) {
+      Mapping containerMapping, PipelineSelector pipelineSelector) {
 
     // Initialize the container state machine.
     Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
@@ -159,10 +158,11 @@ public class ContainerStateManager implements Closeable {
     lastUsedMap = new ConcurrentHashMap<>();
     containerCount = new AtomicLong(0);
     containers = new ContainerStateMap();
-    loadExistingContainers(containerMapping);
+    loadExistingContainers(containerMapping, pipelineSelector);
   }
 
-  private void loadExistingContainers(Mapping containerMapping) {
+  private void loadExistingContainers(Mapping containerMapping,
+                                      PipelineSelector pipelineSelector) {
 
     List<ContainerInfo> containerList;
     try {
@@ -184,6 +184,8 @@ public class ContainerStateManager implements Closeable {
       long maxID = 0;
       for (ContainerInfo container : containerList) {
         containers.addContainer(container);
+        pipelineSelector.addContainerToPipeline(
+                container.getPipelineID(), container.getContainerID());
 
         if (maxID < container.getContainerID()) {
           maxID = container.getContainerID();
@@ -303,6 +305,7 @@ public class ContainerStateManager implements Closeable {
         + "replication=%s couldn't be found for the new container. "
         + "Do you have enough nodes?", type, replicationFactor);
 
+    long containerID = containerCount.incrementAndGet();
     ContainerInfo containerInfo = new ContainerInfo.Builder()
         .setState(HddsProtos.LifeCycleState.ALLOCATED)
         .setPipelineID(pipeline.getId())
@@ -313,11 +316,12 @@ public class ContainerStateManager implements Closeable {
         .setNumberOfKeys(0)
         .setStateEnterTime(Time.monotonicNow())
         .setOwner(owner)
-        .setContainerID(containerCount.incrementAndGet())
+        .setContainerID(containerID)
         .setDeleteTransactionId(0)
         .setReplicationFactor(replicationFactor)
         .setReplicationType(pipeline.getType())
         .build();
+    selector.addContainerToPipeline(pipeline.getId(), containerID);
     Preconditions.checkNotNull(containerInfo);
     containers.addContainer(containerInfo);
     LOG.trace("New container allocated: {}", containerInfo);
@@ -471,17 +475,6 @@ public class ContainerStateManager implements Closeable {
   }
 
   /**
-   * Returns a set of open ContainerIDs that reside on a pipeline.
-   *
-   * @param pipelineID PipelineID of the Containers.
-   * @return Set of containers that match the specific query parameters.
-   */
-  public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(PipelineID
-      pipelineID) {
-    return containers.getOpenContainerIDsByPipeline(pipelineID);
-  }
-
-  /**
    * Returns the containerInfo with pipeline for the given container id.
    * @param selector -- Pipeline selector class.
    * @param containerID id of the container

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 9657594..880a715 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -75,9 +74,6 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
  * Replica and THREE Replica. User can specify how many copies should be made
  * for a ozone key.
  * <p>
- * 5.Pipeline - The pipeline constitute the set of Datanodes on which the
- * open container resides physically.
- * <p>
  * The most common access pattern of this class is to select a container based
  * on all these parameters, for example, when allocating a block we will
  * select a container that belongs to user1, with Ratis replication which can
@@ -92,14 +88,6 @@ public class ContainerStateMap {
   private final ContainerAttribute<String> ownerMap;
   private final ContainerAttribute<ReplicationFactor> factorMap;
   private final ContainerAttribute<ReplicationType> typeMap;
-  // This map constitutes the pipeline to open container mappings.
-  // This map will be queried for the list of open containers on a particular
-  // pipeline and issue a close on corresponding containers in case of
-  // following events:
-  //1. Dead datanode.
-  //2. Datanode out of space.
-  //3. Volume loss or volume out of space.
-  private final ContainerAttribute<PipelineID> openPipelineMap;
 
   private final Map<ContainerID, ContainerInfo> containerMap;
   // Map to hold replicas of given container.
@@ -121,7 +109,6 @@ public class ContainerStateMap {
     ownerMap = new ContainerAttribute<>();
     factorMap = new ContainerAttribute<>();
     typeMap = new ContainerAttribute<>();
-    openPipelineMap = new ContainerAttribute<>();
     containerMap = new HashMap<>();
     lock = new ReentrantReadWriteLock();
     contReplicaMap = new HashMap<>();
@@ -158,9 +145,6 @@ public class ContainerStateMap {
       ownerMap.insert(info.getOwner(), id);
       factorMap.insert(info.getReplicationFactor(), id);
       typeMap.insert(info.getReplicationType(), id);
-      if (info.isContainerOpen()) {
-        openPipelineMap.insert(info.getPipelineID(), id);
-      }
 
       // Flush the cache of this container type, will be added later when
       // get container queries are executed.
@@ -391,11 +375,6 @@ public class ContainerStateMap {
         throw new SCMException("Updating the container map failed.", ex,
             FAILED_TO_CHANGE_CONTAINER_STATE);
       }
-      // In case the container is set to closed state, it needs to be removed
-      // from the pipeline Map.
-      if (!info.isContainerOpen()) {
-        openPipelineMap.remove(info.getPipelineID(), id);
-      }
     } finally {
       lock.writeLock().unlock();
     }
@@ -434,23 +413,6 @@ public class ContainerStateMap {
   }
 
   /**
-   * Returns Open containers in the SCM by the Pipeline.
-   *
-   * @param pipelineID - Pipeline id.
-   * @return NavigableSet<ContainerID>
-   */
-  public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
-      PipelineID pipelineID) {
-    Preconditions.checkNotNull(pipelineID);
-    lock.readLock().lock();
-    try {
-      return openPipelineMap.getCollection(pipelineID);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
    * Returns Containers by replication factor.
    *
    * @param factor - Replication Factor.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 102df8a..07ff2b0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -16,11 +16,10 @@
  */
 package org.apache.hadoop.hdds.scm.pipelines;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.Map;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
@@ -37,17 +36,53 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineManager.class);
-  private final List<PipelineID> activePipelines;
-  private final Map<PipelineID, Pipeline> pipelineMap;
-  private final AtomicInteger pipelineIndex;
-  private final Node2PipelineMap node2PipelineMap;
-
-  public PipelineManager(Node2PipelineMap map,
-      Map<PipelineID, Pipeline> pipelineMap) {
-    activePipelines = new LinkedList<>();
-    pipelineIndex = new AtomicInteger(0);
-    this.pipelineMap = pipelineMap;
-    this.node2PipelineMap = map;
+  private final ArrayList<ActivePipelines> activePipelines;
+
+  public PipelineManager() {
+    activePipelines = new ArrayList<>();
+    for (ReplicationFactor factor : ReplicationFactor.values()) {
+      activePipelines.add(factor.ordinal(), new ActivePipelines());
+    }
+  }
+
+  private static class ActivePipelines {
+    private final List<PipelineID> activePipelines;
+    private final AtomicInteger pipelineIndex;
+
+    ActivePipelines() {
+      activePipelines = new LinkedList<>();
+      pipelineIndex = new AtomicInteger(0);
+    }
+
+    void addPipeline(PipelineID pipelineID) {
+      activePipelines.add(pipelineID);
+    }
+
+    void removePipeline(PipelineID pipelineID) {
+      activePipelines.remove(pipelineID);
+    }
+
+    /**
+     * Find a Pipeline that is operational.
+     *
+     * @return - Pipeline or null
+     */
+    PipelineID findOpenPipeline() {
+      if (activePipelines.size() == 0) {
+        LOG.error("No Operational pipelines found. Returning null.");
+        return null;
+      }
+      return activePipelines.get(getNextIndex());
+    }
+
+    /**
+     * gets the next index of the Pipeline to get.
+     *
+     * @return index in the link list to get.
+     */
+    private int getNextIndex() {
+      return pipelineIndex.incrementAndGet() % activePipelines.size();
+    }
   }
 
   /**
@@ -59,44 +94,30 @@ public abstract class PipelineManager {
    * @param replicationFactor - Replication Factor
    * @return a Pipeline.
    */
-  public synchronized final Pipeline getPipeline(
+  public synchronized final PipelineID getPipeline(
       ReplicationFactor replicationFactor, ReplicationType replicationType) {
-    Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
-    if (pipeline != null) {
+    PipelineID id =
+        activePipelines.get(replicationFactor.ordinal()).findOpenPipeline();
+    if (id != null) {
       LOG.debug("re-used pipeline:{} for container with " +
               "replicationType:{} replicationFactor:{}",
-          pipeline.getId(), replicationType, replicationFactor);
+          id, replicationType, replicationFactor);
     }
-    if (pipeline == null) {
+    if (id == null) {
       LOG.error("Get pipeline call failed. We are not able to find" +
               " operational pipeline.");
       return null;
     } else {
-      return pipeline;
+      return id;
     }
   }
 
-  /**
-   * This function to get pipeline with given pipeline name.
-   *
-   * @param id
-   * @return a Pipeline.
-   */
-  public synchronized final Pipeline getPipeline(PipelineID id) {
-    Pipeline pipeline = null;
-
-    // 1. Check if pipeline already exists
-    if (pipelineMap.containsKey(id)) {
-      pipeline = pipelineMap.get(id);
-      LOG.debug("Returning pipeline for pipelineName:{}", id);
-      return pipeline;
-    } else {
-      LOG.debug("Unable to find pipeline for pipelineName:{}", id);
-    }
-    return pipeline;
+  void addOpenPipeline(Pipeline pipeline) {
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .addPipeline(pipeline.getId());
   }
 
-  protected int getReplicationCount(ReplicationFactor factor) {
+  protected static int getReplicationCount(ReplicationFactor factor) {
     switch (factor) {
     case ONE:
       return 1;
@@ -117,46 +138,6 @@ public abstract class PipelineManager {
   public abstract void initializePipeline(Pipeline pipeline) throws IOException;
 
   /**
-   * Find a Pipeline that is operational.
-   *
-   * @return - Pipeline or null
-   */
-  private Pipeline findOpenPipeline(
-      ReplicationType type, ReplicationFactor factor) {
-    Pipeline pipeline = null;
-    final int sentinal = -1;
-    if (activePipelines.size() == 0) {
-      LOG.error("No Operational pipelines found. Returning null.");
-      return null;
-    }
-    int startIndex = getNextIndex();
-    int nextIndex = sentinal;
-    for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
-      // Just walk the list in a circular way.
-      PipelineID id =
-          activePipelines
-              .get(nextIndex != sentinal ? nextIndex : startIndex);
-      Pipeline temp = pipelineMap.get(id);
-      // if we find an operational pipeline just return that.
-      if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
-          (temp.getFactor() == factor) && (temp.getType() == type)) {
-        pipeline = temp;
-        break;
-      }
-    }
-    return pipeline;
-  }
-
-  /**
-   * gets the next index of the Pipeline to get.
-   *
-   * @return index in the link list to get.
-   */
-  private int getNextIndex() {
-    return pipelineIndex.incrementAndGet() % activePipelines.size();
-  }
-
-  /**
    * Creates a pipeline with a specified replication factor and type.
    * @param replicationFactor - Replication Factor.
    * @param replicationType - Replication Type.
@@ -168,9 +149,6 @@ public abstract class PipelineManager {
       LOG.debug("created new pipeline:{} for container with "
               + "replicationType:{} replicationFactor:{}",
           pipeline.getId(), replicationType, replicationFactor);
-      activePipelines.add(pipeline.getId());
-      pipelineMap.put(pipeline.getId(), pipeline);
-      node2PipelineMap.addPipeline(pipeline);
     }
     return pipeline;
   }
@@ -180,17 +158,15 @@ public abstract class PipelineManager {
    * @param pipeline pipeline to be finalized
    */
   public synchronized void finalizePipeline(Pipeline pipeline) {
-    activePipelines.remove(pipeline.getId());
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .removePipeline(pipeline.getId());
   }
 
   /**
    *
    * @param pipeline
    */
-  public void closePipeline(Pipeline pipeline) throws IOException {
-    pipelineMap.remove(pipeline.getId());
-    node2PipelineMap.removePipeline(pipeline);
-  }
+  public abstract void closePipeline(Pipeline pipeline) throws IOException;
 
   /**
    * list members in the pipeline.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 63afbaa..c9f51f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -39,30 +38,34 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.NavigableSet;
+import java.util.HashMap;
 import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .FAILED_TO_CHANGE_PIPELINE_STATE;
+import static org.apache.hadoop.hdds.server
+        .ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone
+        .OzoneConsts.SCM_PIPELINE_DB;
 
 /**
  * Sends the request to the right pipeline manager.
@@ -73,16 +76,16 @@ public class PipelineSelector {
   private final ContainerPlacementPolicy placementPolicy;
   private final NodeManager nodeManager;
   private final Configuration conf;
-  private final ContainerStateManager containerStateManager;
   private final EventPublisher eventPublisher;
   private final RatisManagerImpl ratisManager;
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
+  private final MetadataStore pipelineStore;
+  private final PipelineStateManager stateManager;
   private final Node2PipelineMap node2PipelineMap;
+  private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
   private final Map<PipelineID, Pipeline> pipelineMap;
   private final LeaseManager<Pipeline> pipelineLeaseManager;
-  private final StateMachine<LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
 
   /**
    * Constructs a pipeline Selector.
@@ -90,9 +93,8 @@ public class PipelineSelector {
    * @param nodeManager - node manager
    * @param conf - Ozone Config
    */
-  public PipelineSelector(NodeManager nodeManager,
-      ContainerStateManager containerStateManager, Configuration conf,
-      EventPublisher eventPublisher) {
+  public PipelineSelector(NodeManager nodeManager, Configuration conf,
+      EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
     this.nodeManager = nodeManager;
     this.conf = conf;
     this.eventPublisher = eventPublisher;
@@ -105,79 +107,66 @@ public class PipelineSelector {
     pipelineMap = new ConcurrentHashMap<>();
     this.standaloneManager =
         new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize, node2PipelineMap, pipelineMap);
+            containerSize);
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf, node2PipelineMap, pipelineMap);
-    // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+            conf);
     long pipelineCreationLeaseTimeout = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    this.containerStateManager = containerStateManager;
     pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
         pipelineCreationLeaseTimeout);
     pipelineLeaseManager.start();
 
-    // These are the steady states of a container.
-    finalStates.add(HddsProtos.LifeCycleState.OPEN);
-    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+    stateManager = new PipelineStateManager();
+    pipeline2ContainerMap = new HashMap<>();
 
-    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
+    // Write the container name to pipeline mapping.
+    File metaDir = getOzoneMetaDirPath(conf);
+    File containerDBPath = new File(metaDir, SCM_PIPELINE_DB);
+    pipelineStore = MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(containerDBPath)
+            .setCacheSize(cacheSizeMB * OzoneConsts.MB)
+            .build();
+
+    reloadExistingPipelines();
   }
 
-  /**
-   * Event and State Transition Mapping.
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CREATING  ---------------> CLOSED
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
-   *            (CREATE)     | (CREATED)     (FINALIZE)   |
-   *                         |                            |
-   *                         |                            |
-   *                         |(TIMEOUT)                   |(CLOSE)
-   *                         |                            |
-   *                         +--------> [CLOSED] <--------+
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
-        HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.TIMEOUT);
+  private void reloadExistingPipelines() throws IOException {
+    if (pipelineStore.isEmpty()) {
+      // Nothing to do just return
+      return;
+    }
+
+    List<Map.Entry<byte[], byte[]>> range =
+            pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
+
+    // Transform the values into the pipelines.
+    // TODO: filter by pipeline state
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      Pipeline pipeline = Pipeline.getFromProtoBuf(
+                HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
+      Preconditions.checkNotNull(pipeline);
+      addExistingPipeline(pipeline);
+    }
+  }
+
+  public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
+    return pipeline2ContainerMap.get(pipelineID);
+  }
+
+  public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
+    pipeline2ContainerMap.get(pipelineID)
+            .add(ContainerID.valueof(containerID));
+  }
+
+  public void removeContainerFromPipeline(PipelineID pipelineID,
+                                          long containerID) throws IOException {
+    pipeline2ContainerMap.get(pipelineID)
+            .remove(ContainerID.valueof(containerID));
+    closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
   }
 
   /**
@@ -294,8 +283,14 @@ public class PipelineSelector {
         manager.createPipeline(replicationFactor, replicationType);
     if (pipeline == null) {
       // try to return a pipeline from already allocated pipelines
-      pipeline = manager.getPipeline(replicationFactor, replicationType);
+      PipelineID pipelineId =
+              manager.getPipeline(replicationFactor, replicationType);
+      pipeline = pipelineMap.get(pipelineId);
+      Preconditions.checkArgument(pipeline.getLifeCycleState() ==
+              LifeCycleState.OPEN);
     } else {
+      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
+              pipeline.getProtobufMessage().toByteArray());
       // if a new pipeline is created, initialize its state machine
       updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
 
@@ -343,8 +338,8 @@ public class PipelineSelector {
     if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
       return;
     }
-    NavigableSet<ContainerID> containerIDS = containerStateManager
-        .getMatchingContainerIDsByPipeline(pipeline.getId());
+    HashSet<ContainerID> containerIDS =
+            pipeline2ContainerMap.get(pipeline.getId());
     if (containerIDS.size() == 0) {
       updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
       LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
@@ -358,56 +353,58 @@ public class PipelineSelector {
     PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
-    NavigableSet<ContainerID> containers =
-        containerStateManager
-            .getMatchingContainerIDsByPipeline(pipeline.getId());
+    HashSet<ContainerID> containers =
+            pipeline2ContainerMap.get(pipeline.getId());
     Preconditions.checkArgument(containers.size() == 0);
     manager.closePipeline(pipeline);
   }
 
-  private void closeContainersByPipeline(Pipeline pipeline) {
-    NavigableSet<ContainerID> containers =
-        containerStateManager
-            .getMatchingContainerIDsByPipeline(pipeline.getId());
-    for (ContainerID id : containers) {
-      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
-    }
-  }
-
   /**
-   * list members in the pipeline .
+   * Add to a given pipeline.
    */
-
-  public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
-      PipelineID pipelineID) throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
-    return manager.getMembers(pipelineID);
-  }
-
-  /**
-   * Update the datanodes in the list of the pipeline.
-   */
-
-  public void updateDatanodes(ReplicationType replicationType, PipelineID
-      pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
+  private void addOpenPipeline(Pipeline pipeline) {
+    PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID,
-        newDatanodes.stream().map(DatanodeDetails::toString)
-            .collect(Collectors.joining(",")));
-    manager.updatePipeline(pipelineID, newDatanodes);
+    LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
+    manager.addOpenPipeline(pipeline);
   }
 
-  public Node2PipelineMap getNode2PipelineMap() {
-    return node2PipelineMap;
+  private void closeContainersByPipeline(Pipeline pipeline) {
+    HashSet<ContainerID> containers =
+            pipeline2ContainerMap.get(pipeline.getId());
+    for (ContainerID id : containers) {
+      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
+    }
   }
 
   public Set<PipelineID> getPipelineId(UUID dnId) {
     return node2PipelineMap.getPipelines(dnId);
   }
 
+  private void addExistingPipeline(Pipeline pipeline) throws IOException {
+    LifeCycleState state = pipeline.getLifeCycleState();
+    switch (state) {
+    case ALLOCATED:
+      // a pipeline in allocated state is only present in SCM and does not exist
+      // on datanode, on SCM restart, this pipeline can be ignored.
+      break;
+    case CREATING:
+    case OPEN:
+    case CLOSING:
+      //TODO: process pipeline report and move pipeline to active queue
+      // when all the nodes have reported.
+      pipelineMap.put(pipeline.getId(), pipeline);
+      pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
+      node2PipelineMap.addPipeline(pipeline);
+      break;
+    case CLOSED:
+      // if the pipeline is in closed state, nothing to do.
+      break;
+    default:
+      throw new IOException("invalid pipeline state:" + state);
+    }
+  }
+
   /**
    * Update the Pipeline State to the next state.
    *
@@ -417,24 +414,12 @@ public class PipelineSelector {
    */
   public void updatePipelineState(Pipeline pipeline,
       HddsProtos.LifeCycleEvent event) throws IOException {
-    HddsProtos.LifeCycleState newState;
-    try {
-      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update pipeline state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          pipeline.getId(), pipeline.getLifeCycleState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    Preconditions.checkNotNull(pipeline);
     try {
       switch (event) {
       case CREATE:
+        pipelineMap.put(pipeline.getId(), pipeline);
+        pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
+        node2PipelineMap.addPipeline(pipeline);
         // Acquire lease on pipeline
         Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
         // Register callback to be executed in case of timeout
@@ -446,6 +431,7 @@ public class PipelineSelector {
       case CREATED:
         // Release the lease on pipeline
         pipelineLeaseManager.release(pipeline);
+        addOpenPipeline(pipeline);
         break;
 
       case FINALIZE:
@@ -455,21 +441,30 @@ public class PipelineSelector {
       case CLOSE:
       case TIMEOUT:
         closePipeline(pipeline);
+        pipeline2ContainerMap.remove(pipeline.getId());
+        node2PipelineMap.removePipeline(pipeline);
+        pipelineMap.remove(pipeline.getId());
         break;
       default:
         throw new SCMException("Unsupported pipeline LifeCycleEvent.",
             FAILED_TO_CHANGE_PIPELINE_STATE);
       }
 
-      pipeline.setLifeCycleState(newState);
+      stateManager.updatePipelineState(pipeline, event);
+      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
+              pipeline.getProtobufMessage().toByteArray());
     } catch (LeaseException e) {
       throw new IOException("Lease Exception.", e);
     }
   }
 
-  public void shutdown() {
+  public void shutdown() throws IOException {
     if (pipelineLeaseManager != null) {
       pipelineLeaseManager.shutdown();
     }
+
+    if (pipelineStore != null) {
+      pipelineStore.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
new file mode 100644
index 0000000..6054f16
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_PIPELINE_STATE;
+
+/**
+ * Manages Pipeline states.
+ */
+public class PipelineStateManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineStateManager.class);
+
+  private final StateMachine<HddsProtos.LifeCycleState,
+      HddsProtos.LifeCycleEvent> stateMachine;
+
+  PipelineStateManager() {
+    // Initialize the container state machine.
+    Set<HddsProtos.LifeCycleState> finalStates = new HashSet<>();
+    // These are the steady states of a container.
+    finalStates.add(HddsProtos.LifeCycleState.OPEN);
+    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+  }
+
+  /**
+   * Event and State Transition Mapping.
+   *
+   * State: ALLOCATED ---------------> CREATING
+   * Event:                CREATE
+   *
+   * State: CREATING  ---------------> OPEN
+   * Event:               CREATED
+   *
+   * State: OPEN      ---------------> CLOSING
+   * Event:               FINALIZE
+   *
+   * State: CLOSING   ---------------> CLOSED
+   * Event:                CLOSE
+   *
+   * State: CREATING  ---------------> CLOSED
+   * Event:               TIMEOUT
+   *
+   *
+   * Container State Flow:
+   *
+   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+   *            (CREATE)     | (CREATED)     (FINALIZE)   |
+   *                         |                            |
+   *                         |                            |
+   *                         |(TIMEOUT)                   |(CLOSE)
+   *                         |                            |
+   *                         +--------> [CLOSED] <--------+
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+        HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleEvent.CREATE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleEvent.FINALIZE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.TIMEOUT);
+  }
+
+
+  /**
+   * Update the Pipeline State to the next state.
+   *
+   * @param pipeline - Pipeline
+   * @param event - LifeCycle Event
+   * @throws SCMException  on Failure.
+   */
+  public void updatePipelineState(Pipeline pipeline,
+      HddsProtos.LifeCycleEvent event) throws IOException {
+    HddsProtos.LifeCycleState newState;
+    try {
+      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update pipeline state %s, " +
+              "reason: invalid state transition from state: %s upon " +
+              "event: %s.",
+          pipeline.getId(), pipeline.getLifeCycleState(), event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
+
+    // This is a post condition after executing getNextState.
+    Preconditions.checkNotNull(newState);
+    Preconditions.checkNotNull(pipeline);
+    pipeline.setLifeCycleState(newState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index 0342e18..d3cec88 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -39,7 +38,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-import java.util.Map;
 
 /**
  * Implementation of {@link PipelineManager}.
@@ -59,9 +57,8 @@ public class RatisManagerImpl extends PipelineManager {
    * @param nodeManager
    */
   public RatisManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
-      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
-    super(map, pipelineMap);
+      ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
+    super();
     this.conf = conf;
     this.nodeManager = nodeManager;
     ratisMembers = new HashSet<>();
@@ -114,7 +111,6 @@ public class RatisManagerImpl extends PipelineManager {
         XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
       client.destroyPipeline();
     }
-    super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in ratis members list.
       Preconditions.checkArgument(ratisMembers.remove(node));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index 2573b9c..ed2fc2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -37,7 +36,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-import java.util.Map;
 
 /**
  * Standalone Manager Impl to prove that pluggable interface
@@ -58,9 +56,8 @@ public class StandaloneManagerImpl extends PipelineManager {
    * @param containerSize - Container Size.
    */
   public StandaloneManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long containerSize,
-      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
-    super(map, pipelineMap);
+      ContainerPlacementPolicy placementPolicy, long containerSize) {
+    super();
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize =  containerSize;
@@ -105,7 +102,6 @@ public class StandaloneManagerImpl extends PipelineManager {
    * Close the pipeline.
    */
   public void closePipeline(Pipeline pipeline) throws IOException {
-    super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in standalone members list.
       Preconditions.checkArgument(standAloneMembers.remove(node));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index d74a32f..a59179b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -75,6 +75,7 @@ public class TestContainerReportHandler implements EventPublisher {
     OzoneConfiguration conf = new OzoneConfiguration();
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
     Mapping mapping = Mockito.mock(Mapping.class);
+    PipelineSelector selector = Mockito.mock(PipelineSelector.class);
 
     when(mapping.getContainer(anyLong()))
         .thenAnswer(
@@ -87,7 +88,7 @@ public class TestContainerReportHandler implements EventPublisher {
       );
 
     ContainerStateManager containerStateManager =
-        new ContainerStateManager(conf, mapping);
+        new ContainerStateManager(conf, mapping, selector);
 
     when(mapping.getStateManager()).thenReturn(containerStateManager);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index fe92ee5..b857740 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,7 +42,8 @@ public class TestContainerStateManager {
   public void init() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
     Mapping mapping = Mockito.mock(Mapping.class);
-    containerStateManager = new ContainerStateManager(conf, mapping);
+    PipelineSelector selector =  Mockito.mock(PipelineSelector.class);
+    containerStateManager = new ContainerStateManager(conf, mapping, selector);
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 0b69f5f..5ca9cb7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import org.junit.Assert;
@@ -60,7 +61,8 @@ public class TestDeadNodeHandler {
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
     ContainerStateManager containerStateManager = new ContainerStateManager(
         new OzoneConfiguration(),
-        Mockito.mock(Mapping.class)
+        Mockito.mock(Mapping.class),
+        Mockito.mock(PipelineSelector.class)
     );
 
     ContainerInfo container1 =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index c6e819b..422a7de 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -54,8 +55,9 @@ public class TestContainerStateManagerIntegration {
   private MiniOzoneCluster cluster;
   private XceiverClientManager xceiverClientManager;
   private StorageContainerManager scm;
-  private Mapping scmContainerMapping;
+  private ContainerMapping scmContainerMapping;
   private ContainerStateManager containerStateManager;
+  private PipelineSelector selector;
   private String containerOwner = "OZONE";
 
 
@@ -66,8 +68,9 @@ public class TestContainerStateManagerIntegration {
     cluster.waitForClusterToBeReady();
     xceiverClientManager = new XceiverClientManager(conf);
     scm = cluster.getStorageContainerManager();
-    scmContainerMapping = scm.getScmContainerManager();
+    scmContainerMapping = (ContainerMapping) scm.getScmContainerManager();
     containerStateManager = scmContainerMapping.getStateManager();
+    selector = scmContainerMapping.getPipelineSelector();
   }
 
   @After
@@ -133,8 +136,7 @@ public class TestContainerStateManagerIntegration {
     // New instance of ContainerStateManager should load all the containers in
     // container store.
     ContainerStateManager stateManager =
-        new ContainerStateManager(conf, scmContainerMapping
-        );
+        new ContainerStateManager(conf, scmContainerMapping, selector);
     int matchCount = stateManager
         .getMatchingContainerIDs(containerOwner,
             xceiverClientManager.getType(), xceiverClientManager.getFactor(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index b8cb9970..aefa6b0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@@ -60,8 +59,8 @@ public class TestNode2PipelineMap {
    *
    * @throws IOException
    */
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     conf = new OzoneConfiguration();
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
@@ -75,8 +74,8 @@ public class TestNode2PipelineMap {
   /**
    * Shutdown MiniDFSCluster.
    */
-  @AfterClass
-  public static void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -86,19 +85,20 @@ public class TestNode2PipelineMap {
   @Test
   public void testPipelineMap() throws IOException {
 
-    NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+    Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
         ratisContainer.getPipeline().getId());
 
     long cId = ratisContainer.getContainerInfo().getContainerID();
     Assert.assertEquals(1, set.size());
-    Assert.assertEquals(cId, set.first().getId());
+    set.forEach(containerID ->
+            Assert.assertEquals(containerID, ContainerID.valueof(cId)));
 
     List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
     Assert.assertEquals(3, dns.size());
 
     // get pipeline details by dnid
     Set<PipelineID> pipelines = mapping.getPipelineSelector()
-        .getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
+        .getPipelineId(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
     pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));
@@ -114,7 +114,7 @@ public class TestNode2PipelineMap {
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
     mapping
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
-    NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
+    Set<ContainerID> set2 = pipelineSelector.getOpenContainerIDsByPipeline(
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, set2.size());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 0f8f925..a5828e1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -35,7 +35,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@@ -88,12 +88,13 @@ public class TestPipelineClose {
 
   @Test
   public void testPipelineCloseWithClosedContainer() throws IOException {
-    NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+    Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
         ratisContainer1.getPipeline().getId());
 
     long cId = ratisContainer1.getContainerInfo().getContainerID();
     Assert.assertEquals(1, set.size());
-    Assert.assertEquals(cId, set.first().getId());
+    set.forEach(containerID ->
+            Assert.assertEquals(containerID, ContainerID.valueof(cId)));
 
     // Now close the container and it should not show up while fetching
     // containers by pipeline
@@ -106,7 +107,7 @@ public class TestPipelineClose {
     mapping
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
 
-    NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
+    Set<ContainerID> setClosed = pipelineSelector.getOpenContainerIDsByPipeline(
         ratisContainer1.getPipeline().getId());
     Assert.assertEquals(0, setClosed.size());
 
@@ -118,15 +119,15 @@ public class TestPipelineClose {
         HddsProtos.LifeCycleState.CLOSED);
     for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
-      Assert.assertEquals(pipelineSelector.getNode2PipelineMap()
-          .getPipelines(dn.getUuid()).size(), 0);
+      Assert.assertEquals(pipelineSelector.getPipelineId(
+          dn.getUuid()).size(), 0);
     }
   }
 
   @Test
   public void testPipelineCloseWithOpenContainer() throws IOException,
       TimeoutException, InterruptedException {
-    NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
+    Set<ContainerID> setOpen = pipelineSelector.getOpenContainerIDsByPipeline(
         ratisContainer2.getPipeline().getId());
     Assert.assertEquals(1, setOpen.size());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
new file mode 100644
index 0000000..3999d76
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto
+        .HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto
+        .HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Test SCM restart and recovery wrt pipelines.
+ */
+public class TestSCMRestart {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static Pipeline ratisPipeline1;
+  private static Pipeline ratisPipeline2;
+  private static ContainerMapping mapping;
+  private static ContainerMapping newMapping;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(6)
+        .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
+        .build();
+    cluster.waitForClusterToBeReady();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    mapping = (ContainerMapping)scm.getScmContainerManager();
+    ratisPipeline1 =
+            mapping.allocateContainer(RATIS, THREE, "Owner1").getPipeline();
+    ratisPipeline2 =
+            mapping.allocateContainer(RATIS, ONE, "Owner2").getPipeline();
+    // At this stage, there should be 2 pipeline one with 1 open container
+    // each. Try restarting the SCM and then discover that pipeline are in
+    // correct state.
+    cluster.restartStorageContainerManager();
+    newMapping = (ContainerMapping)(cluster.getStorageContainerManager()
+            .getScmContainerManager());
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testPipelineWithScmRestart() {
+    // After restart make sure that the pipeline are still present
+    Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector()
+            .getPipeline(ratisPipeline1.getId());
+    Pipeline ratisPipeline2AfterRestart = newMapping.getPipelineSelector()
+            .getPipeline(ratisPipeline2.getId());
+    Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1);
+    Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
+    Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
+    Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 3cba839..d13efb4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -138,8 +138,11 @@ public interface MiniOzoneCluster {
    * Restarts StorageContainerManager instance.
    *
    * @throws IOException
+   * @throws TimeoutException
+   * @throws InterruptedException
    */
-  void restartStorageContainerManager() throws IOException;
+  void restartStorageContainerManager() throws InterruptedException,
+      TimeoutException, IOException;
 
   /**
    * Restarts OzoneManager instance.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index c2169a3..b34a7d1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -85,7 +85,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
       LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
 
   private final OzoneConfiguration conf;
-  private final StorageContainerManager scm;
+  private StorageContainerManager scm;
   private final OzoneManager ozoneManager;
   private final List<HddsDatanodeService> hddsDatanodes;
 
@@ -215,9 +215,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
   }
 
   @Override
-  public void restartStorageContainerManager() throws IOException {
+  public void restartStorageContainerManager()
+      throws TimeoutException, InterruptedException, IOException {
     scm.stop();
+    scm.join();
+    scm = StorageContainerManager.createSCM(null, conf);
     scm.start();
+    waitForClusterToBeReady();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message