hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bra...@apache.org
Subject [37/50] [abbrv] hadoop git commit: HDDS-694. Plugin new Pipeline management code in SCM. Contributed by Lokesh Jain.
Date Tue, 30 Oct 2018 06:03:07 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index 48939f1..9df9dff 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,24 +19,44 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
 
 /**
  * Handles Stale node event.
  */
 public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StaleNodeHandler.class);
 
-  private final PipelineSelector pipelineSelector;
+  private final NodeManager nodeManager;
+  private final PipelineManager pipelineManager;
 
-  public StaleNodeHandler(PipelineSelector pipelineSelector) {
-    this.pipelineSelector = pipelineSelector;
+  public StaleNodeHandler(NodeManager nodeManager,
+      PipelineManager pipelineManager) {
+    this.nodeManager = nodeManager;
+    this.pipelineManager = pipelineManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    pipelineSelector.handleStaleNode(datanodeDetails);
+    Set<PipelineID> pipelineIds =
+        nodeManager.getPipelineByDnID(datanodeDetails.getUuid());
+    for (PipelineID pipelineID : pipelineIds) {
+      try {
+        pipelineManager.finalizePipeline(pipelineID);
+      } catch (IOException e) {
+        LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
+            datanodeDetails);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 87f2222..bf19261 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -55,7 +55,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
    * @param pipeline Pipeline to be added
    */
   public synchronized void addPipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+    for (DatanodeDetails details : pipeline.getNodes()) {
       UUID dnId = details.getUuid();
       dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
           .add(pipeline.getId());
@@ -63,7 +63,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
   }
 
   public synchronized void removePipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+    for (DatanodeDetails details : pipeline.getNodes()) {
       UUID dnId = details.getUuid();
       dn2ObjectMap.computeIfPresent(dnId,
           (k, v) -> {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 261c544..c06a3bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -50,8 +50,8 @@ public final class PipelineFactory {
     return providers.get(type).create(factor);
   }
 
-  public Pipeline create(ReplicationType type, List<DatanodeDetails> nodes)
-      throws IOException {
-    return providers.get(type).create(nodes);
+  public Pipeline create(ReplicationType type, ReplicationFactor factor,
+      List<DatanodeDetails> nodes) {
+    return providers.get(type).create(factor, nodes);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 51f9e86..04ec535 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -36,14 +36,14 @@ public interface PipelineManager extends Closeable {
   Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException;
 
-  Pipeline createPipeline(ReplicationType type, List<DatanodeDetails> nodes)
-      throws IOException;
+  Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
+      List<DatanodeDetails> nodes);
 
-  Pipeline getPipeline(PipelineID pipelineID) throws IOException;
+  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
 
-  List<Pipeline> getPipelinesByType(ReplicationType type);
+  List<Pipeline> getPipelines(ReplicationType type);
 
-  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor);
 
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 2fc2e0e..84b6375 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -31,5 +31,5 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor) throws IOException;
 
-  Pipeline create(List<DatanodeDetails> nodes) throws IOException;
+  Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index ad11b47..6c31a12 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -76,7 +76,13 @@ public class PipelineReportHandler implements
   private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
       throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
-    Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
+    Pipeline pipeline = null;
+    try {
+      pipeline = pipelineManager.getPipeline(pipelineID);
+    } catch (PipelineNotFoundException e) {
+      //TODO: introduce per datanode command for pipeline destroy
+      return;
+    }
 
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
       pipeline.reportDatanode(dn);
@@ -87,14 +93,14 @@ public class PipelineReportHandler implements
     } else if (pipeline.isClosed()) {
       int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
       if (numContainers == 0) {
-        // if all the containers have been closed the pipeline can be destroyed
+        // remove the pipeline from the pipeline manager
+        pipelineManager.removePipeline(pipelineID);
+        // since all the containers have been closed the pipeline can be
+        // destroyed
         try (XceiverClientRatis client =
             XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
           client.destroyPipeline();
         }
-        // after successfully destroying the pipeline, the pipeline can be
-        // removed from the pipeline manager
-        pipelineManager.removePipeline(pipelineID);
       }
     } else {
       // In OPEN state case just report the datanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 8f5f89a..67f74d3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,9 +37,6 @@ import java.util.Set;
  */
 class PipelineStateManager {
 
-  private static final Logger LOG = LoggerFactory.getLogger(
-      org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class);
-
   private final PipelineStateMap pipelineStateMap;
 
   PipelineStateManager(Configuration conf) {
@@ -57,17 +52,20 @@ class PipelineStateManager {
     pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
   }
 
-  Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
     return pipelineStateMap.getPipeline(pipelineID);
   }
 
-  List<Pipeline> getPipelinesByType(ReplicationType type) {
-    return pipelineStateMap.getPipelinesByType(type);
+  List<Pipeline> getPipelines(ReplicationType type) {
+    return pipelineStateMap.getPipelines(type);
+  }
+
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) {
+    return pipelineStateMap.getPipelines(type, factor);
   }
 
-  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
-      ReplicationFactor factor) {
-    return pipelineStateMap.getPipelinesByTypeAndFactor(type, factor);
+  List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
+    return pipelineStateMap.getPipelines(type, states);
   }
 
   Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException {
@@ -78,8 +76,8 @@ class PipelineStateManager {
     return pipelineStateMap.getNumberOfContainers(pipelineID);
   }
 
-  void removePipeline(PipelineID pipelineID) throws IOException {
-    pipelineStateMap.removePipeline(pipelineID);
+  Pipeline removePipeline(PipelineID pipelineID) throws IOException {
+    return pipelineStateMap.removePipeline(pipelineID);
   }
 
   void removeContainerFromPipeline(PipelineID pipelineID,
@@ -87,7 +85,8 @@ class PipelineStateManager {
     pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
   }
 
-  Pipeline finalizePipeline(PipelineID pipelineId) throws IOException {
+  Pipeline finalizePipeline(PipelineID pipelineId)
+      throws PipelineNotFoundException {
     Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
     if (!pipeline.isClosed()) {
       pipeline = pipelineStateMap

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 110d26b..7b69491 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -31,8 +31,7 @@ import java.util.stream.Collectors;
 
 /**
  * Holds the data structures which maintain the information about pipeline and
- * its state. All the read write operations in this class are protected by a
- * lock.
+ * its state.
  * Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and
  * pipeline2container would have a non-null mapping for it.
  */
@@ -65,12 +64,12 @@ class PipelineStateMap {
         String.format("Nodes size=%d, replication factor=%d do not match ",
                 pipeline.getNodes().size(), pipeline.getFactor().getNumber()));
 
-    if (pipelineMap.putIfAbsent(pipeline.getID(), pipeline) != null) {
-      LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getID());
+    if (pipelineMap.putIfAbsent(pipeline.getId(), pipeline) != null) {
+      LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getId());
       throw new IOException(String
-          .format("Duplicate pipeline ID %s detected.", pipeline.getID()));
+          .format("Duplicate pipeline ID %s detected.", pipeline.getId()));
     }
-    pipeline2container.put(pipeline.getID(), new TreeSet<>());
+    pipeline2container.put(pipeline.getId(), new TreeSet<>());
   }
 
   /**
@@ -85,12 +84,13 @@ class PipelineStateMap {
     Preconditions.checkNotNull(pipelineID,
         "Pipeline Id cannot be null");
     Preconditions.checkNotNull(containerID,
-        "container Id cannot be null");
+        "Container Id cannot be null");
 
     Pipeline pipeline = getPipeline(pipelineID);
-    if (!pipeline.isOpen()) {
-      throw new IOException(
-          String.format("%s is not in open state", pipelineID));
+    if (pipeline.isClosed()) {
+      throw new IOException(String
+          .format("Cannot add container to pipeline=%s in closed state",
+              pipelineID));
     }
     pipeline2container.get(pipelineID).add(containerID);
   }
@@ -102,10 +102,14 @@ class PipelineStateMap {
    * @return Pipeline
    * @throws IOException if pipeline is not found
    */
-  Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
+    Preconditions.checkNotNull(pipelineID,
+        "Pipeline Id cannot be null");
+
     Pipeline pipeline = pipelineMap.get(pipelineID);
     if (pipeline == null) {
-      throw new IOException(String.format("%s not found", pipelineID));
+      throw new PipelineNotFoundException(
+          String.format("%s not found", pipelineID));
     }
     return pipeline;
   }
@@ -116,29 +120,52 @@ class PipelineStateMap {
    * @param type - ReplicationType
    * @return List of pipelines which have the specified replication type
    */
-  List<Pipeline> getPipelinesByType(ReplicationType type) {
+  List<Pipeline> getPipelines(ReplicationType type) {
     Preconditions.checkNotNull(type, "Replication type cannot be null");
 
-    return pipelineMap.values().stream().filter(p -> p.getType().equals(type))
+    return pipelineMap.values().stream()
+        .filter(p -> p.getType().equals(type))
         .collect(Collectors.toList());
   }
 
   /**
-   * Get open pipeline corresponding to specified replication type and factor.
+   * Get pipeline corresponding to specified replication type and factor.
    *
    * @param type - ReplicationType
    * @param factor - ReplicationFactor
-   * @return List of open pipelines with specified replication type and factor
+   * @return List of pipelines with specified replication type and factor
    */
-  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
-      ReplicationFactor factor) {
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+    Preconditions.checkNotNull(factor, "Replication factor cannot be null");
+
     return pipelineMap.values().stream()
-        .filter(pipeline -> pipeline.isOpen() && pipeline.getType() == type
+        .filter(pipeline -> pipeline.getType() == type
             && pipeline.getFactor() == factor)
         .collect(Collectors.toList());
   }
 
   /**
+   * Get list of pipeline corresponding to specified replication type and
+   * pipeline states.
+   *
+   * @param type - ReplicationType
+   * @param states - Array of required PipelineState
+   * @return List of pipelines with specified replication type and states
+   */
+  List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+    Preconditions.checkNotNull(states, "Pipeline state cannot be null");
+
+    Set<PipelineState> pipelineStates = new HashSet<>();
+    pipelineStates.addAll(Arrays.asList(states));
+    return pipelineMap.values().stream().filter(
+        pipeline -> pipeline.getType() == type && pipelineStates
+            .contains(pipeline.getPipelineState()))
+        .collect(Collectors.toList());
+  }
+
+  /**
    * Get set of containerIDs corresponding to a pipeline.
    *
    * @param pipelineID - PipelineID
@@ -146,10 +173,14 @@ class PipelineStateMap {
    * @throws IOException if pipeline is not found
    */
   Set<ContainerID> getContainers(PipelineID pipelineID)
-      throws IOException {
+      throws PipelineNotFoundException {
+    Preconditions.checkNotNull(pipelineID,
+        "Pipeline Id cannot be null");
+
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     if (containerIDs == null) {
-      throw new IOException(String.format("%s not found", pipelineID));
+      throw new PipelineNotFoundException(
+          String.format("%s not found", pipelineID));
     }
     return new HashSet<>(containerIDs);
   }
@@ -161,10 +192,15 @@ class PipelineStateMap {
    * @return Number of containers belonging to the pipeline
    * @throws IOException if pipeline is not found
    */
-  int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+  int getNumberOfContainers(PipelineID pipelineID)
+      throws PipelineNotFoundException {
+    Preconditions.checkNotNull(pipelineID,
+        "Pipeline Id cannot be null");
+
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     if (containerIDs == null) {
-      throw new IOException(String.format("%s not found", pipelineID));
+      throw new PipelineNotFoundException(
+          String.format("%s not found", pipelineID));
     }
     return containerIDs.size();
   }
@@ -175,7 +211,7 @@ class PipelineStateMap {
    * @param pipelineID - PipelineID of the pipeline to be removed
    * @throws IOException if the pipeline is not empty or does not exist
    */
-  void removePipeline(PipelineID pipelineID) throws IOException {
+  Pipeline removePipeline(PipelineID pipelineID) throws IOException {
     Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
 
     Pipeline pipeline = getPipeline(pipelineID);
@@ -192,6 +228,7 @@ class PipelineStateMap {
 
     pipelineMap.remove(pipelineID);
     pipeline2container.remove(pipelineID);
+    return pipeline;
   }
 
   /**
@@ -210,6 +247,10 @@ class PipelineStateMap {
         "container Id cannot be null");
 
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
+    if (containerIDs == null) {
+      throw new PipelineNotFoundException(
+          String.format("%s not found", pipelineID));
+    }
     containerIDs.remove(containerID);
   }
 
@@ -223,7 +264,7 @@ class PipelineStateMap {
    * @throws IOException if pipeline does not exist
    */
   Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
-      throws IOException {
+      throws PipelineNotFoundException {
     Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
     Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 400ab24..590cd27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -93,7 +93,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   public Pipeline create(ReplicationFactor factor) throws IOException {
     // Get set of datanodes already used for ratis pipeline
     Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelinesByType(ReplicationType.RATIS)
+    stateManager.getPipelines(ReplicationType.RATIS)
         .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
     // Get list of healthy nodes
@@ -112,7 +112,7 @@ public class RatisPipelineProvider implements PipelineProvider {
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.ALLOCATED)
+        .setState(PipelineState.OPEN)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
@@ -122,16 +122,11 @@ public class RatisPipelineProvider implements PipelineProvider {
   }
 
   @Override
-  public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
-    ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
-    if (factor == null) {
-      throw new IOException(String
-          .format("Nodes size=%d does not match any replication factor",
-              nodes.size()));
-    }
+  public Pipeline create(ReplicationFactor factor,
+      List<DatanodeDetails> nodes) {
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.ALLOCATED)
+        .setState(PipelineState.OPEN)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(nodes)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 6a9c783..a853693 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -78,14 +78,14 @@ public class SCMPipelineManager implements PipelineManager {
     File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
     this.pipelineStore =
         MetadataStoreBuilder.newBuilder()
+            .setCreateIfMissing(true)
             .setConf(conf)
             .setDbFile(pipelineDBPath)
             .setCacheSize(cacheSize * OzoneConsts.MB)
             .build();
-    initializePipelineState();
-
     this.eventPublisher = eventPublisher;
     this.nodeManager = nodeManager;
+    initializePipelineState();
   }
 
   private void initializePipelineState() throws IOException {
@@ -97,12 +97,11 @@ public class SCMPipelineManager implements PipelineManager {
         pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
 
     for (Map.Entry<byte[], byte[]> entry : pipelines) {
-      Pipeline pipeline = Pipeline
-          .fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
+      Pipeline pipeline = Pipeline.getFromProtobuf(
+          HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
       Preconditions.checkNotNull(pipeline);
       stateManager.addPipeline(pipeline);
-      // TODO: add pipeline to node manager
-      // nodeManager.addPipeline(pipeline);
+      nodeManager.addPipeline(pipeline);
     }
   }
 
@@ -112,10 +111,10 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline =  pipelineFactory.create(type, factor);
-      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
+      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
-      // TODO: add pipeline to node manager
+      nodeManager.addPipeline(pipeline);
       return pipeline;
     } finally {
       lock.writeLock().unlock();
@@ -123,20 +122,20 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public Pipeline createPipeline(ReplicationType type,
-                                 List<DatanodeDetails> nodes)
-      throws IOException {
+  public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
+                                 List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
     lock.writeLock().lock();
     try {
-      return pipelineFactory.create(type, nodes);
+      return pipelineFactory.create(type, factor, nodes);
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
-  public Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+  public Pipeline getPipeline(PipelineID pipelineID)
+      throws PipelineNotFoundException {
     lock.readLock().lock();
     try {
       return stateManager.getPipeline(pipelineID);
@@ -146,21 +145,21 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public List<Pipeline> getPipelinesByType(ReplicationType type) {
+  public List<Pipeline> getPipelines(ReplicationType type) {
     lock.readLock().lock();
     try {
-      return stateManager.getPipelinesByType(type);
+      return stateManager.getPipelines(type);
     } finally {
       lock.readLock().unlock();
     }
   }
 
   @Override
-  public List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+  public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor) {
     lock.readLock().lock();
     try {
-      return stateManager.getPipelinesByTypeAndFactor(type, factor);
+      return stateManager.getPipelines(type, factor);
     } finally {
       lock.readLock().unlock();
     }
@@ -232,9 +231,9 @@ public class SCMPipelineManager implements PipelineManager {
   public void removePipeline(PipelineID pipelineID) throws IOException {
     lock.writeLock().lock();
     try {
-      stateManager.removePipeline(pipelineID);
       pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
-      // TODO: remove pipeline from node manager
+      Pipeline pipeline = stateManager.removePipeline(pipelineID);
+      nodeManager.removePipeline(pipeline);
     } finally {
       lock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index c95fcfb..3e42df3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -54,7 +54,7 @@ public class SimplePipelineProvider implements PipelineProvider {
     Collections.shuffle(dns);
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.ALLOCATED)
+        .setState(PipelineState.OPEN)
         .setType(ReplicationType.STAND_ALONE)
         .setFactor(factor)
         .setNodes(dns.subList(0, factor.getNumber()))
@@ -62,16 +62,11 @@ public class SimplePipelineProvider implements PipelineProvider {
   }
 
   @Override
-  public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
-    ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
-    if (factor == null) {
-      throw new IOException(String
-          .format("Nodes size=%d does not match any replication factor",
-              nodes.size()));
-    }
+  public Pipeline create(ReplicationFactor factor,
+      List<DatanodeDetails> nodes) {
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.ALLOCATED)
+        .setState(PipelineState.OPEN)
         .setType(ReplicationType.STAND_ALONE)
         .setFactor(factor)
         .setNodes(nodes)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
deleted file mode 100644
index 1053149..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .PipelineActionsFromDatanode;
-
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles pipeline actions from datanode.
- */
-public class PipelineActionEventHandler implements
-    EventHandler<PipelineActionsFromDatanode> {
-
-  public static final Logger LOG = LoggerFactory.getLogger(
-      PipelineActionEventHandler.class);
-
-  public PipelineActionEventHandler() {
-
-  }
-
-  @Override
-  public void onMessage(PipelineActionsFromDatanode report,
-      EventPublisher publisher) {
-    for (PipelineAction action : report.getReport().getPipelineActionsList()) {
-      switch (action.getAction()) {
-      case CLOSE:
-        PipelineID pipelineID = PipelineID.
-            getFromProtobuf(action.getClosePipeline().getPipelineID());
-        LOG.info("Closing pipeline " + pipelineID + " for reason:" + action
-            .getClosePipeline().getDetailedReason());
-        publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID);
-        break;
-      default:
-        LOG.error("unknown pipeline action:{}" + action.getAction());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
deleted file mode 100644
index e49678f..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles pipeline close event.
- */
-public class PipelineCloseHandler implements EventHandler<PipelineID> {
-  private static final Logger LOG = LoggerFactory
-          .getLogger(PipelineCloseHandler.class);
-
-  private final PipelineSelector pipelineSelector;
-  public PipelineCloseHandler(PipelineSelector pipelineSelector) {
-    this.pipelineSelector = pipelineSelector;
-  }
-
-  @Override
-  public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
-    Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
-    try {
-      if (pipeline != null) {
-        pipelineSelector.finalizePipeline(pipeline);
-      } else {
-        LOG.debug("pipeline:{} not found", pipelineID);
-      }
-    } catch (Exception e) {
-      LOG.info("failed to close pipeline:{}", pipelineID, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
deleted file mode 100644
index ca2e878..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Manage Ozone pipelines.
- */
-public abstract class PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineManager.class);
-  protected final ArrayList<ActivePipelines> activePipelines;
-
-  public PipelineManager() {
-    activePipelines = new ArrayList<>();
-    for (ReplicationFactor factor : ReplicationFactor.values()) {
-      activePipelines.add(factor.ordinal(), new ActivePipelines());
-    }
-  }
-
-  /**
-   * List of active pipelines.
-   */
-  public static class ActivePipelines {
-    private final List<PipelineID> activePipelines;
-    private final AtomicInteger pipelineIndex;
-
-    ActivePipelines() {
-      activePipelines = new LinkedList<>();
-      pipelineIndex = new AtomicInteger(0);
-    }
-
-    void addPipeline(PipelineID pipelineID) {
-      if (!activePipelines.contains(pipelineID)) {
-        activePipelines.add(pipelineID);
-      }
-    }
-
-    public void removePipeline(PipelineID pipelineID) {
-      activePipelines.remove(pipelineID);
-    }
-
-    /**
-     * Find a Pipeline that is operational.
-     *
-     * @return - Pipeline or null
-     */
-    PipelineID findOpenPipeline() {
-      if (activePipelines.size() == 0) {
-        LOG.error("No Operational pipelines found. Returning null.");
-        return null;
-      }
-      return activePipelines.get(getNextIndex());
-    }
-
-    /**
-     * gets the next index of the Pipeline to get.
-     *
-     * @return index in the link list to get.
-     */
-    private int getNextIndex() {
-      return pipelineIndex.incrementAndGet() % activePipelines.size();
-    }
-  }
-
-  /**
-   * This function is called by the Container Manager while allocating a new
-   * container. The client specifies what kind of replication pipeline is
-   * needed and based on the replication type in the request appropriate
-   * Interface is invoked.
-   *
-   * @param replicationFactor - Replication Factor
-   * @return a Pipeline.
-   */
-  public synchronized final PipelineID getPipeline(
-      ReplicationFactor replicationFactor, ReplicationType replicationType) {
-    PipelineID id =
-        activePipelines.get(replicationFactor.ordinal()).findOpenPipeline();
-    if (id != null) {
-      LOG.debug("re-used pipeline:{} for container with " +
-              "replicationType:{} replicationFactor:{}",
-          id, replicationType, replicationFactor);
-    }
-    if (id == null) {
-      LOG.error("Get pipeline call failed. We are not able to find" +
-              " operational pipeline.");
-      return null;
-    } else {
-      return id;
-    }
-  }
-
-  void addOpenPipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .addPipeline(pipeline.getId());
-  }
-
-  public abstract Pipeline allocatePipeline(
-      ReplicationFactor replicationFactor);
-
-  /**
-   * Initialize the pipeline.
-   * TODO: move the initialization to Ozone Client later
-   */
-  public abstract void initializePipeline(Pipeline pipeline) throws IOException;
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    if (pipeline.addMember(dn)
-        &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber())
-        && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) {
-      addOpenPipeline(pipeline);
-    }
-  }
-
-  /**
-   * Creates a pipeline with a specified replication factor and type.
-   * @param replicationFactor - Replication Factor.
-   * @param replicationType - Replication Type.
-   */
-  public Pipeline createPipeline(ReplicationFactor replicationFactor,
-      ReplicationType replicationType) throws IOException {
-    Pipeline pipeline = allocatePipeline(replicationFactor);
-    if (pipeline != null) {
-      LOG.debug("created new pipeline:{} for container with "
-              + "replicationType:{} replicationFactor:{}",
-          pipeline.getId(), replicationType, replicationFactor);
-    }
-    return pipeline;
-  }
-
-  /**
-   * Remove the pipeline from active allocation.
-   * @param pipeline pipeline to be finalized
-   */
-  public abstract boolean finalizePipeline(Pipeline pipeline);
-
-  /**
-   *
-   * @param pipeline
-   */
-  public abstract void closePipeline(Pipeline pipeline) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
deleted file mode 100644
index 933792b..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.server
-        .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles Node Reports from datanode.
- */
-public class PipelineReportHandler implements
-        EventHandler<PipelineReportFromDatanode> {
-
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PipelineReportHandler.class);
-  private final PipelineSelector pipelineSelector;
-
-  public PipelineReportHandler(PipelineSelector pipelineSelector) {
-    Preconditions.checkNotNull(pipelineSelector);
-    this.pipelineSelector = pipelineSelector;
-  }
-
-  @Override
-  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
-      EventPublisher publisher) {
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
-    PipelineReportsProto pipelineReport =
-            pipelineReportFromDatanode.getReport();
-    Preconditions.checkNotNull(dn, "Pipeline Report is "
-        + "missing DatanodeDetails.");
-    LOGGER.trace("Processing pipeline report for dn: {}", dn);
-    pipelineSelector.processPipelineReport(dn, pipelineReport);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
deleted file mode 100644
index c8d22ff..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .SCMContainerPlacementRandom;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
-import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_PIPELINE_STATE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_ACTIVE_PIPELINE;
-import static org.apache.hadoop.hdds.server
-        .ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone
-        .OzoneConsts.SCM_PIPELINE_DB;
-
-/**
- * Sends the request to the right pipeline manager.
- */
-public class PipelineSelector {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineSelector.class);
-  private final ContainerPlacementPolicy placementPolicy;
-  private final Map<ReplicationType, PipelineManager> pipelineManagerMap;
-  private final Configuration conf;
-  private final EventPublisher eventPublisher;
-  private final long containerSize;
-  private final MetadataStore pipelineStore;
-  private final PipelineStateManager stateManager;
-  private final NodeManager nodeManager;
-  private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
-  private final Map<PipelineID, Pipeline> pipelineMap;
-  private final LeaseManager<Pipeline> pipelineLeaseManager;
-
-  /**
-   * Constructs a pipeline Selector.
-   *
-   * @param nodeManager - node manager
-   * @param conf - Ozone Config
-   */
-  public PipelineSelector(NodeManager nodeManager, Configuration conf,
-      EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
-    this.conf = conf;
-    this.eventPublisher = eventPublisher;
-    this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
-    this.containerSize = (long)this.conf.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-    pipelineMap = new ConcurrentHashMap<>();
-    pipelineManagerMap = new HashMap<>();
-
-    pipelineManagerMap.put(ReplicationType.STAND_ALONE,
-            new StandaloneManagerImpl(nodeManager, placementPolicy,
-            containerSize));
-    pipelineManagerMap.put(ReplicationType.RATIS,
-            new RatisManagerImpl(nodeManager, placementPolicy,
-                    containerSize, conf));
-    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
-        pipelineCreationLeaseTimeout);
-    pipelineLeaseManager.start();
-
-    stateManager = new PipelineStateManager();
-    this.nodeManager = nodeManager;
-    pipeline2ContainerMap = new HashMap<>();
-
-    // Write the container name to pipeline mapping.
-    File metaDir = getOzoneMetaDirPath(conf);
-    File containerDBPath = new File(metaDir, SCM_PIPELINE_DB);
-    pipelineStore = MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(cacheSizeMB * OzoneConsts.MB)
-            .build();
-
-    reloadExistingPipelines();
-  }
-
-  private void reloadExistingPipelines() throws IOException {
-    if (pipelineStore.isEmpty()) {
-      // Nothing to do just return
-      return;
-    }
-
-    List<Map.Entry<byte[], byte[]>> range =
-            pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
-
-    // Transform the values into the pipelines.
-    // TODO: filter by pipeline state
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      Pipeline pipeline = Pipeline.getFromProtoBuf(
-                HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
-      Preconditions.checkNotNull(pipeline);
-      addExistingPipeline(pipeline);
-    }
-  }
-
-  @VisibleForTesting
-  public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
-    return pipeline2ContainerMap.get(pipelineID);
-  }
-
-  public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
-    pipeline2ContainerMap.get(pipelineID)
-            .add(ContainerID.valueof(containerID));
-  }
-
-  public void removeContainerFromPipeline(PipelineID pipelineID,
-                                          long containerID) throws IOException {
-    pipeline2ContainerMap.get(pipelineID)
-            .remove(ContainerID.valueof(containerID));
-    closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
-  }
-
-  /**
-   * Translates a list of nodes, ordered such that the first is the leader, into
-   * a corresponding {@link Pipeline} object.
-   *
-   * @param nodes - list of datanodes on which we will allocate the container.
-   * The first of the list will be the leader node.
-   * @return pipeline corresponding to nodes
-   */
-  public static Pipeline newPipelineFromNodes(
-      List<DatanodeDetails> nodes, ReplicationType replicationType,
-      ReplicationFactor replicationFactor, PipelineID id) {
-    Preconditions.checkNotNull(nodes);
-    Preconditions.checkArgument(nodes.size() > 0);
-    String leaderId = nodes.get(0).getUuidString();
-    // A new pipeline always starts in allocated state
-    Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
-        replicationType, replicationFactor, id);
-    for (DatanodeDetails node : nodes) {
-      pipeline.addMember(node);
-    }
-    return pipeline;
-  }
-
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  private static ContainerPlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends ContainerPlacementPolicy> implClass =
-        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
-
-    try {
-      Constructor<? extends ContainerPlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-      LOG.error("Unhandled exception occurred, Placement policy will not be " +
-          "functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "ContainerPlacementPolicy", e);
-    }
-  }
-
-  /**
-   * This function is called by the Container Manager while allocating a new
-   * container. The client specifies what kind of replication pipeline is needed
-   * and based on the replication type in the request appropriate Interface is
-   * invoked.
-   */
-
-  public Pipeline getReplicationPipeline(ReplicationType replicationType,
-      HddsProtos.ReplicationFactor replicationFactor)
-      throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Getting replication pipeline forReplicationType {} :" +
-            " ReplicationFactor {}", replicationType.toString(),
-        replicationFactor.toString());
-
-    /**
-     * In the Ozone world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return already allocated pipeline
-     * in a round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline =
-        manager.createPipeline(replicationFactor, replicationType);
-    if (pipeline == null) {
-      // try to return a pipeline from already allocated pipelines
-      PipelineID pipelineId =
-              manager.getPipeline(replicationFactor, replicationType);
-      if (pipelineId == null) {
-        throw new SCMException(FAILED_TO_FIND_ACTIVE_PIPELINE);
-      }
-      pipeline = pipelineMap.get(pipelineId);
-      Preconditions.checkArgument(pipeline.getLifeCycleState() ==
-              LifeCycleState.OPEN);
-    } else {
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-              pipeline.getProtobufMessage().toByteArray());
-      // if a new pipeline is created, initialize its state machine
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
-
-      //TODO: move the initialization of pipeline to Ozone Client
-      manager.initializePipeline(pipeline);
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
-    }
-    return pipeline;
-  }
-
-  /**
-   * This function to return pipeline for given pipeline id.
-   */
-  public Pipeline getPipeline(PipelineID pipelineID) {
-    return pipelineMap.get(pipelineID);
-  }
-
-  /**
-   * Finalize a given pipeline.
-   */
-  public void finalizePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
-        pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
-      LOG.debug("pipeline:{} already in closing state, skipping",
-          pipeline.getId());
-      // already in closing/closed state
-      return;
-    }
-
-    // Remove the pipeline from active allocation
-    if (manager.finalizePipeline(pipeline)) {
-      LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
-      closePipelineIfNoOpenContainers(pipeline);
-    }
-  }
-
-  /**
-   * Close a given pipeline.
-   */
-  private void closePipelineIfNoOpenContainers(Pipeline pipeline)
-      throws IOException {
-    if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
-      return;
-    }
-    HashSet<ContainerID> containerIDS =
-            pipeline2ContainerMap.get(pipeline.getId());
-    if (containerIDS.size() == 0) {
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
-      LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
-    }
-  }
-
-  /**
-   * Close a given pipeline.
-   */
-  private void closePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
-    HashSet<ContainerID> containers =
-            pipeline2ContainerMap.get(pipeline.getId());
-    Preconditions.checkArgument(containers.size() == 0);
-    manager.closePipeline(pipeline);
-  }
-
-  /**
-   * Add to a given pipeline.
-   */
-  private void addOpenPipeline(Pipeline pipeline) {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
-    manager.addOpenPipeline(pipeline);
-  }
-
-  private void closeContainersByPipeline(Pipeline pipeline) {
-    HashSet<ContainerID> containers =
-            pipeline2ContainerMap.get(pipeline.getId());
-    for (ContainerID id : containers) {
-      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
-    }
-  }
-
-  private void addExistingPipeline(Pipeline pipeline) throws IOException {
-    LifeCycleState state = pipeline.getLifeCycleState();
-    switch (state) {
-    case ALLOCATED:
-      // a pipeline in allocated state is only present in SCM and does not exist
-      // on datanode, on SCM restart, this pipeline can be ignored.
-      break;
-    case CREATING:
-    case OPEN:
-    case CLOSING:
-      //TODO: process pipeline report and move pipeline to active queue
-      // when all the nodes have reported.
-      pipelineMap.put(pipeline.getId(), pipeline);
-      pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-      nodeManager.addPipeline(pipeline);
-      // reset the datanodes in the pipeline
-      // they will be reset on
-      pipeline.resetPipeline();
-      break;
-    case CLOSED:
-      // if the pipeline is in closed state, nothing to do.
-      break;
-    default:
-      throw new IOException("invalid pipeline state:" + state);
-    }
-  }
-
-  public void handleStaleNode(DatanodeDetails dn) {
-    Set<PipelineID> pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid());
-    for (PipelineID id : pipelineIDs) {
-      LOG.info("closing pipeline {}.", id);
-      eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
-    }
-  }
-
-  void processPipelineReport(DatanodeDetails dn,
-                                    PipelineReportsProto pipelineReport) {
-    Set<PipelineID> reportedPipelines = new HashSet<>();
-    pipelineReport.getPipelineReportList().
-            forEach(p ->
-                    reportedPipelines.add(
-                            processPipelineReport(p.getPipelineID(), dn)));
-
-    //TODO: handle missing pipelines and new pipelines later
-  }
-
-  private PipelineID processPipelineReport(
-          HddsProtos.PipelineID id, DatanodeDetails dn) {
-    PipelineID pipelineID = PipelineID.getFromProtobuf(id);
-    Pipeline pipeline = pipelineMap.get(pipelineID);
-    if (pipeline != null) {
-      pipelineManagerMap.get(pipeline.getType())
-              .processPipelineReport(pipeline, dn);
-    }
-    return pipelineID;
-  }
-
-  /**
-   * Update the Pipeline State to the next state.
-   *
-   * @param pipeline - Pipeline
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
-   */
-  public void updatePipelineState(Pipeline pipeline,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    try {
-      switch (event) {
-      case CREATE:
-        pipelineMap.put(pipeline.getId(), pipeline);
-        pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-        nodeManager.addPipeline(pipeline);
-        // Acquire lease on pipeline
-        Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
-        // Register callback to be executed in case of timeout
-        pipelineLease.registerCallBack(() -> {
-          updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on pipeline
-        pipelineLeaseManager.release(pipeline);
-        addOpenPipeline(pipeline);
-        break;
-
-      case FINALIZE:
-        closeContainersByPipeline(pipeline);
-        break;
-
-      case CLOSE:
-      case TIMEOUT:
-        closePipeline(pipeline);
-        pipeline2ContainerMap.remove(pipeline.getId());
-        nodeManager.removePipeline(pipeline);
-        pipelineMap.remove(pipeline.getId());
-        break;
-      default:
-        throw new SCMException("Unsupported pipeline LifeCycleEvent.",
-            FAILED_TO_CHANGE_PIPELINE_STATE);
-      }
-
-      stateManager.updatePipelineState(pipeline, event);
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-              pipeline.getProtobufMessage().toByteArray());
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    }
-  }
-
-  public void shutdown() throws IOException {
-    if (pipelineLeaseManager != null) {
-      pipelineLeaseManager.shutdown();
-    }
-
-    if (pipelineStore != null) {
-      pipelineStore.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
deleted file mode 100644
index 6054f16..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_PIPELINE_STATE;
-
-/**
- * Manages Pipeline states.
- */
-public class PipelineStateManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineStateManager.class);
-
-  private final StateMachine<HddsProtos.LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
-
-  PipelineStateManager() {
-    // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet<>();
-    // These are the steady states of a container.
-    finalStates.add(HddsProtos.LifeCycleState.OPEN);
-    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
-
-    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-  }
-
-  /**
-   * Event and State Transition Mapping.
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CREATING  ---------------> CLOSED
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
-   *            (CREATE)     | (CREATED)     (FINALIZE)   |
-   *                         |                            |
-   *                         |                            |
-   *                         |(TIMEOUT)                   |(CLOSE)
-   *                         |                            |
-   *                         +--------> [CLOSED] <--------+
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
-        HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.TIMEOUT);
-  }
-
-
-  /**
-   * Update the Pipeline State to the next state.
-   *
-   * @param pipeline - Pipeline
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
-   */
-  public void updatePipelineState(Pipeline pipeline,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    HddsProtos.LifeCycleState newState;
-    try {
-      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update pipeline state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          pipeline.getId(), pipeline.getLifeCycleState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    Preconditions.checkNotNull(pipeline);
-    pipeline.setLifeCycleState(newState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
deleted file mode 100644
index ea24c58..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-/**
- Ozone supports the notion of different kind of pipelines.
- That means that we can have a replication pipeline build on
- Ratis, Standalone or some other protocol. All Pipeline managers
- the entities in charge of pipelines reside in the package.
-
- Here is the high level Arch.
-
- 1. A pipeline selector class is instantiated in the Container manager class.
-
- 2. A client when creating a container -- will specify what kind of
- replication type it wants to use. We support 2 types now, Ratis and StandAlone.
-
- 3. Based on the replication type, the pipeline selector class asks the
- corresponding pipeline manager for a pipeline.
-
- 4. We have supported the ability for clients to specify a set of nodes in
- the pipeline or rely in the pipeline manager to select the datanodes if they
- are not specified.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
deleted file mode 100644
index 905a5b5..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.ratis;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Implementation of {@link PipelineManager}.
- *
- * TODO : Introduce a state machine.
- */
-public class RatisManagerImpl extends PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisManagerImpl.class);
-  private final Configuration conf;
-  private final NodeManager nodeManager;
-  private final Set<DatanodeDetails> ratisMembers;
-
-  /**
-   * Constructs a Ratis Pipeline Manager.
-   *
-   * @param nodeManager
-   */
-  public RatisManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
-    super();
-    this.conf = conf;
-    this.nodeManager = nodeManager;
-    ratisMembers = new HashSet<>();
-  }
-
-  /**
-   * Allocates a new ratis Pipeline from the free nodes.
-   *
-   * @param factor - One or Three
-   * @return Pipeline.
-   */
-  public Pipeline allocatePipeline(ReplicationFactor factor) {
-    List<DatanodeDetails> newNodesList = new LinkedList<>();
-    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
-    // data from datanode instead of maintaining a set.
-    for (DatanodeDetails datanode : datanodes) {
-      Preconditions.checkNotNull(datanode);
-      if (!ratisMembers.contains(datanode)) {
-        newNodesList.add(datanode);
-        if (newNodesList.size() == factor.getNumber()) {
-          // once a datanode has been added to a pipeline, exclude it from
-          // further allocations
-          ratisMembers.addAll(newNodesList);
-          PipelineID pipelineID = PipelineID.randomId();
-          LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
-                  factor.getNumber(), pipelineID);
-          return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.RATIS, factor, pipelineID);
-        }
-      }
-    }
-    return null;
-  }
-
-  public void initializePipeline(Pipeline pipeline) throws IOException {
-    //TODO:move the initialization from SCM to client
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline();
-    }
-  }
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    super.processPipelineReport(pipeline, dn);
-    ratisMembers.add(dn);
-  }
-
-  public synchronized boolean finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-    return true;
-  }
-
-  /**
-   * Close the pipeline.
-   */
-  public void closePipeline(Pipeline pipeline) throws IOException {
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.destroyPipeline();
-    }
-    for (DatanodeDetails node : pipeline.getMachines()) {
-      // A node should always be the in ratis members list.
-      Preconditions.checkArgument(ratisMembers.remove(node));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
deleted file mode 100644
index 2970fb3..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.ratis;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
deleted file mode 100644
index 045afb6..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.standalone;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Standalone Manager Impl to prove that pluggable interface
- * works with current tests.
- */
-public class StandaloneManagerImpl extends PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StandaloneManagerImpl.class);
-  private final NodeManager nodeManager;
-  private final ContainerPlacementPolicy placementPolicy;
-  private final long containerSize;
-  private final Set<DatanodeDetails> standAloneMembers;
-
-  /**
-   * Constructor for Standalone Node Manager Impl.
-   * @param nodeManager - Node Manager.
-   * @param placementPolicy - Placement Policy
-   * @param containerSize - Container Size.
-   */
-  public StandaloneManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long containerSize) {
-    super();
-    this.nodeManager = nodeManager;
-    this.placementPolicy = placementPolicy;
-    this.containerSize =  containerSize;
-    this.standAloneMembers = new HashSet<>();
-  }
-
-
-  /**
-   * Allocates a new standalone Pipeline from the free nodes.
-   *
-   * @param factor - One
-   * @return Pipeline.
-   */
-  public Pipeline allocatePipeline(ReplicationFactor factor) {
-    List<DatanodeDetails> newNodesList = new LinkedList<>();
-    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    for (DatanodeDetails datanode : datanodes) {
-      Preconditions.checkNotNull(datanode);
-      if (!standAloneMembers.contains(datanode)) {
-        newNodesList.add(datanode);
-        if (newNodesList.size() == factor.getNumber()) {
-          // once a datanode has been added to a pipeline, exclude it from
-          // further allocations
-          standAloneMembers.addAll(newNodesList);
-          // Standalone pipeline use node id as pipeline
-          PipelineID pipelineID =
-                  PipelineID.valueOf(newNodesList.get(0).getUuid());
-          LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
-              factor.getNumber(), pipelineID);
-          return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
-        }
-      }
-    }
-    return null;
-  }
-
-  public void initializePipeline(Pipeline pipeline) {
-    // Nothing to be done for standalone pipeline
-  }
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    super.processPipelineReport(pipeline, dn);
-    standAloneMembers.add(dn);
-  }
-
-  public synchronized boolean finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-    return false;
-  }
-
-  /**
-   * Close the pipeline.
-   */
-  public void closePipeline(Pipeline pipeline) throws IOException {
-    for (DatanodeDetails node : pipeline.getMachines()) {
-      // A node should always be the in standalone members list.
-      Preconditions.checkArgument(standAloneMembers.remove(node));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
deleted file mode 100644
index b2c3ca40..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.standalone;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 89a6c81..e92200a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message