hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sammic...@apache.org
Subject [hadoop-ozone] 04/06: HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
Date Tue, 03 Dec 2019 08:33:15 GMT
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 260018feb4e427bccd0a357537f9ca5c578eb16b
Author: Li Cheng <bloodhell2004@gmail.com>
AuthorDate: Tue Oct 29 12:46:00 2019 +0800

    HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
    
    This closes #28
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  10 +-
 .../common/src/main/resources/ozone-default.xml    |  15 ++-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   5 +
 .../ContainerPlacementPolicyFactory.java           |   8 +-
 .../hdds/scm/node/states/Node2PipelineMap.java     |   2 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   1 +
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  89 +++++++++----
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |   5 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 137 ++++++++++++---------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 103 ++++++++++++++++
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  13 +-
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |   8 ++
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  13 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  15 ++-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  17 +--
 .../hdds/scm/pipeline/TestPipelineClose.java       |   1 +
 .../TestRatisPipelineCreateAndDestroy.java         |  24 +++-
 .../scm/pipeline/TestRatisPipelineProvider.java    | 107 +++++++---------
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   5 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   3 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  12 ++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   6 +-
 .../ozone/client/rpc/Test2WayCommitInRatis.java    |   1 +
 .../ozone/client/rpc/TestBlockOutputStream.java    |   1 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |   7 +-
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |   1 +
 .../rpc/TestContainerReplicationEndToEnd.java      |   5 +-
 .../client/rpc/TestContainerStateMachine.java      |   5 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  12 +-
 .../client/rpc/TestFailureHandlingByClient.java    |   4 +-
 .../client/rpc/TestHybridPipelineOnDatanode.java   |   3 +-
 .../ozone/client/rpc/TestKeyInputStream.java       |   1 +
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |   8 +-
 .../rpc/TestOzoneClientRetriesOnException.java     |   1 +
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   1 +
 .../ozone/client/rpc/TestWatchForCommit.java       |   3 +
 .../TestCloseContainerByPipeline.java              |   5 +
 .../TestSCMContainerPlacementPolicyMetrics.java    |   1 +
 .../hadoop/ozone/scm/node/TestQueryNode.java       |   3 +
 .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java  |   3 +
 .../hadoop/ozone/freon/TestDataValidate.java       |   2 +-
 .../ozone/freon/TestFreonWithPipelineDestroy.java  |   1 +
 43 files changed, 469 insertions(+), 201 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 7810362..96a3f5d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -319,7 +319,15 @@ public final class ScmConfigKeys {
   // the max number of pipelines can a single datanode be engaged in.
   public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
           "ozone.scm.datanode.max.pipeline.engagement";
-  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
+
+  // Upper limit for how many pipelines can be created.
+  // Only for test purpose now.
+  public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+      "ozone.scm.pipeline.number.limit";
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
 
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 95692b0..300955f 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -832,10 +832,19 @@
     </description>
   </property>
   <property>
-    <name>ozone.scm.datanode.max.pipeline.engagement</name>
-    <value>5</value>
+  <name>ozone.scm.datanode.max.pipeline.engagement</name>
+  <value>0</value>
+  <tag>OZONE, SCM, PIPELINE</tag>
+  <description>Max number of pipelines per datanode can be engaged in.
+  </description>
+  </property>
+  <property>
+    <name>ozone.scm.pipeline.number.limit</name>
+    <value>0</value>
     <tag>OZONE, SCM, PIPELINE</tag>
-    <description>Max number of pipelines per datanode can be engaged in.
+    <description>Upper limit for how many pipelines can be OPEN in SCM.
+      0 as default means there is no limit. Otherwise, the number is the limit
+      of max amount of pipelines which are OPEN.
     </description>
   </property>
   <property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index b7a7525..cdc3878 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -197,8 +197,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+
           // wait until pipeline is ready
           pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+        } catch (SCMException se) {
+          LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+              "Datanodes may be used up.", type, factor, se);
+          break;
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
   }
 
 
-  public static PlacementPolicy getPolicy(Configuration conf,
-    final NodeManager nodeManager, NetworkTopology clusterMap,
-    final boolean fallback, SCMContainerPlacementMetrics metrics)
-    throws SCMException{
+  public static PlacementPolicy getPolicy(
+      Configuration conf, final NodeManager nodeManager,
+      NetworkTopology clusterMap, final boolean fallback,
+      SCMContainerPlacementMetrics metrics) throws SCMException{
     final Class<? extends PlacementPolicy> placementClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
             OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..18809ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -80,7 +80,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
       dn2ObjectMap.computeIfPresent(dnId,
           (k, v) -> {
             v.remove(pipeline.getId());
-            return v;
+            return v.isEmpty() ? null : v;
           });
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 13dbc97..a4b3f85 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -115,6 +115,7 @@ class BackgroundPipelineCreator {
           if (scheduler.isClosed()) {
             break;
           }
+
           pipelineManager.createPipeline(type, factor);
         } catch (IOException ioe) {
           break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..23eb574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -52,6 +53,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
+  private final PipelineStateManager stateManager;
   private final Configuration conf;
   private final int heavyNodeCriteria;
 
@@ -59,15 +61,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * Constructs a pipeline placement with considering network topology,
    * load balancing and rack awareness.
    *
-   * @param nodeManager Node Manager
+   * @param nodeManager NodeManager
+   * @param stateManager PipelineStateManager
    * @param conf        Configuration
    */
-  public PipelinePlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
+  public PipelinePlacementPolicy(final NodeManager nodeManager,
+      final PipelineStateManager stateManager, final Configuration conf) {
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
     this.conf = conf;
-    heavyNodeCriteria = conf.getInt(
+    this.stateManager = stateManager;
+    this.heavyNodeCriteria = conf.getInt(
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
@@ -76,11 +80,46 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * Returns true if this node meets the criteria.
    *
    * @param datanodeDetails DatanodeDetails
+   * @param nodesRequired nodes required count
    * @return true if we have enough space.
    */
   @VisibleForTesting
-  boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
-    return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+  boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+    if (heavyNodeCriteria == 0) {
+      // no limit applied.
+      return true;
+    }
+    // Datanodes from pipeline in some states can also be considered available
+    // for pipeline allocation. Thus the number of these pipeline shall be
+    // deducted from total heaviness calculation.
+    int pipelineNumDeductable = 0;
+    Set<PipelineID> pipelines = nodeManager.getPipelines(datanodeDetails);
+    for (PipelineID pid : pipelines) {
+      Pipeline pipeline;
+      try {
+        pipeline = stateManager.getPipeline(pid);
+      } catch (PipelineNotFoundException e) {
+        LOG.error("Pipeline not found in pipeline state manager during" +
+            " pipeline creation. PipelineID: " + pid +
+            " exception: " + e.getMessage());
+        continue;
+      }
+      if (pipeline != null &&
+          pipeline.getFactor().getNumber() == nodesRequired &&
+          pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+          pipeline.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+        pipelineNumDeductable++;
+      }
+    }
+    boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+        - pipelineNumDeductable) < heavyNodeCriteria;
+    if (!meet) {
+      LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+          "datanodeļ¼š " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+          nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+          heavyNodeCriteria);
+    }
+    return meet;
   }
 
   /**
@@ -102,18 +141,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     if (excludedNodes != null) {
       healthyNodes.removeAll(excludedNodes);
     }
+    int initialHealthyNodesCount = healthyNodes.size();
     String msg;
-    if (healthyNodes.size() == 0) {
+    if (initialHealthyNodesCount == 0) {
       msg = "No healthy node found to allocate pipeline.";
       LOG.error(msg);
       throw new SCMException(msg, SCMException.ResultCodes
           .FAILED_TO_FIND_HEALTHY_NODES);
     }
 
-    if (healthyNodes.size() < nodesRequired) {
+    if (initialHealthyNodesCount < nodesRequired) {
       msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
               + " datanodes required. Found %d",
-          nodesRequired, healthyNodes.size());
+          nodesRequired, initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -121,14 +161,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
 
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
-    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
-        meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+    List<DatanodeDetails> healthyList = healthyNodes.stream()
+        .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+        .collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
       msg = String.format("Unable to find enough nodes that meet " +
               "the criteria that cannot engage in more than %d pipelines." +
-              " Nodes required: %d Found: %d",
-          heavyNodeCriteria, nodesRequired, healthyList.size());
+              " Nodes required: %d Found: %d, healthy nodes count in " +
+              "NodeManager: %d.",
+          heavyNodeCriteria, nodesRequired, healthyList.size(),
+          initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -154,13 +197,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // and make sure excludedNodes are excluded from list.
     List<DatanodeDetails> healthyNodes =
         filterViableNodes(excludedNodes, nodesRequired);
-    
-    // Randomly picks nodes when all nodes are equal.
+
+    // Randomly picks nodes when all nodes are equal or factor is ONE.
     // This happens when network topology is absent or
     // all nodes are on the same rack.
     if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
-      LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
-          "Required nodes: {}", nodesRequired);
       return super.getResultSet(nodesRequired, healthyNodes);
     } else {
       // Since topology and rack awareness are available, picks nodes
@@ -188,8 +229,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // First choose an anchor nodes randomly
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor == null) {
-      LOG.error("Unable to find the first healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+              "that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +245,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         healthyNodes, exclude,
         nodeManager.getClusterNetworkTopologyMap(), anchor);
     if (nodeOnDifferentRack == null) {
-      LOG.error("Unable to find nodes on different racks that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+              " that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +269,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     }
 
     if (results.size() < nodesRequired) {
-      LOG.error("Unable to find the required number of healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
-          nodesRequired, results.size());
+      LOG.error("Pipeline Placement: Unable to find the required number of " +
+              "healthy nodes that  meet the criteria. Required nodes: {}, " +
+              "Found nodes: {}", nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
   PipelineStateMap() {
 
     // TODO: Use TreeMap for range operations?
-    pipelineMap = new HashMap<>();
-    pipeline2container = new HashMap<>();
+    pipelineMap = new ConcurrentHashMap<>();
+    pipeline2container = new ConcurrentHashMap<>();
     query2OpenPipelines = new HashMap<>();
     initializeQueryMap();
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index a38c2f7..ceba620 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -38,8 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -60,6 +56,9 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final PipelineStateManager stateManager;
   private final Configuration conf;
   private final EventPublisher eventPublisher;
+  private final PipelinePlacementPolicy placementPolicy;
+  private int pipelineNumberLimit;
+  private int maxPipelinePerDatanode;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -82,65 +81,93 @@ public class RatisPipelineProvider implements PipelineProvider {
     this.stateManager = stateManager;
     this.conf = conf;
     this.eventPublisher = eventPublisher;
+    this.placementPolicy =
+        new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+    this.pipelineNumberLimit = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+    this.maxPipelinePerDatanode = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
 
+  private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
+      throws SCMException {
+    Set<DatanodeDetails> dnsUsed = new HashSet<>();
+    stateManager.getPipelines(ReplicationType.RATIS, factor)
+        .stream().filter(
+          p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+              p.getPipelineState().equals(PipelineState.DORMANT) ||
+              p.getPipelineState().equals(PipelineState.ALLOCATED))
+        .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
-  private static PlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends PlacementPolicy> implClass =
-        (Class<? extends PlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
+    // Get list of healthy nodes
+    List<DatanodeDetails> dns = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY)
+        .parallelStream()
+        .filter(dn -> !dnsUsed.contains(dn))
+        .limit(factor.getNumber())
+        .collect(Collectors.toList());
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes." +
+                  " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+              dns.size(), dnsUsed.size(),
+              nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+      throw new SCMException(e,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return dns;
+  }
 
-    try {
-      Constructor<? extends PlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-//      LOG.error("Unhandled exception occurred, Placement policy will not " +
-//          "be functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "PlacementPolicy", e);
+  private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+    if (factor != ReplicationFactor.THREE) {
+      // Only put limits for Factor THREE pipelines.
+      return false;
+    }
+    // Per datanode limit
+    if (maxPipelinePerDatanode > 0) {
+      return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+          stateManager.getPipelines(ReplicationType.RATIS, factor,
+              Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+          nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+          factor.getNumber();
+    }
+
+    // Global limit
+    if (pipelineNumberLimit > 0) {
+      return (stateManager.getPipelines(ReplicationType.RATIS,
+          ReplicationFactor.THREE).size() - stateManager.getPipelines(
+          ReplicationType.RATIS, ReplicationFactor.THREE,
+          Pipeline.PipelineState.CLOSED).size()) >
+          (pipelineNumberLimit - stateManager.getPipelines(
+              ReplicationType.RATIS, ReplicationFactor.ONE).size());
     }
+
+    return false;
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
-    // Get set of datanodes already used for ratis pipeline
-    Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
-        p -> p.getPipelineState().equals(PipelineState.OPEN) ||
-            p.getPipelineState().equals(PipelineState.DORMANT) ||
-            p.getPipelineState().equals(PipelineState.ALLOCATED))
-        .forEach(p -> dnsUsed.addAll(p.getNodes()));
+    if (exceedPipelineNumberLimit(factor)) {
+      throw new SCMException("Ratis pipeline number meets the limit: " +
+          pipelineNumberLimit + " factor : " +
+          factor.getNumber(),
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
 
-    // Get list of healthy nodes
-    List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY)
-            .parallelStream()
-            .filter(dn -> !dnsUsed.contains(dn))
-            .limit(factor.getNumber())
-            .collect(Collectors.toList());
-    if (dns.size() < factor.getNumber()) {
-      String e = String
-          .format("Cannot create pipeline of factor %d using %d nodes.",
-              factor.getNumber(), dns.size());
-      throw new InsufficientDatanodesException(e);
+    List<DatanodeDetails> dns;
+
+    switch(factor) {
+    case ONE:
+      dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+      break;
+    case THREE:
+      dns = placementPolicy.chooseDatanodes(null,
+          null, factor.getNumber(), 0);
+      break;
+    default:
+      throw new IllegalStateException("Unknown factor: " + factor.name());
     }
 
     Pipeline pipeline = Pipeline.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
new file mode 100644
index 0000000..04393a1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for Ratis pipelines. Contains methods to create and destroy
+ * ratis pipelines.
+ */
+public final class RatisPipelineUtils {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RatisPipelineUtils.class);
+
+  private RatisPipelineUtils() {
+  }
+  /**
+   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @param ozoneConf       - Ozone configuration
+   * @param grpcTlsConfig
+   * @throws IOException
+   */
+  public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
+      GrpcTlsConfig grpcTlsConfig) {
+    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
+    }
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      try {
+        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
+      } catch (IOException e) {
+        LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+            pipeline.getId(), dn, e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Sends ratis command to destroy pipeline on the given datanode.
+   *
+   * @param dn         - Datanode on which pipeline needs to be destroyed
+   * @param pipelineID - ID of pipeline to be destroyed
+   * @param ozoneConf  - Ozone configuration
+   * @param grpcTlsConfig - grpc tls configuration
+   * @throws IOException
+   */
+  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
+      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    final RaftPeer p = RatisHelper.toRaftPeer(dn);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
+    try(RaftClient client = RatisHelper
+        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
+            requestTimeout)) {
+      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
+          true, p.getId());
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 00a4429..f44962e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -55,10 +55,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
 
 /**
@@ -98,8 +94,8 @@ public class SCMPipelineManager implements PipelineManager {
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
         new BackgroundPipelineCreator(this, scheduler, conf);
-    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+        ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     final File metaDir = ServerUtils.getScmDbDir(conf);
     final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
     this.pipelineStore =
@@ -167,10 +163,9 @@ public class SCMPipelineManager implements PipelineManager {
         metrics.createPerPipelineMetrics(pipeline);
       }
       return pipeline;
-    } catch (InsufficientDatanodesException idEx) {
-      throw idEx;
     } catch (IOException ex) {
       metrics.incNumPipelineCreationFailed();
+      LOG.error("Pipeline creation failed.", ex);
       throw ex;
     } finally {
       lock.writeLock().unlock();
@@ -179,7 +174,7 @@ public class SCMPipelineManager implements PipelineManager {
 
   @Override
   public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
-                                 List<DatanodeDetails> nodes) {
+      List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
     // We don't update the metrics for SimplePipelines.
     lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index 40a6f29..8c348ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -135,6 +135,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Get the number of pipeline created.
+   * @return number of pipeline
+   */
+  long getNumPipelineCreated() {
+    return numPipelineCreated.value();
+  }
+
+  /**
    * Increments number of failed pipeline creation count.
    */
   void incNumPipelineCreationFailed() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index aa5d73e..757023c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hdds.scm.safemode;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -46,6 +49,8 @@ public class HealthyPipelineSafeModeRule
   private int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
   private final double healthyPipelinesPercent;
+  private final Set<PipelineID> processedPipelineIDs =
+      new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
@@ -117,8 +122,11 @@ public class HealthyPipelineSafeModeRule
     Preconditions.checkNotNull(pipeline);
     if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
         pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-      currentHealthyPipelineCount++;
+      if (!processedPipelineIDs.contains(pipeline.getId())) {
+        getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+        currentHealthyPipelineCount++;
+        processedPipelineIDs.add(pipeline.getId());
+      }
     }
 
     if (scmInSafeMode()) {
@@ -131,6 +139,7 @@ public class HealthyPipelineSafeModeRule
 
   @Override
   protected void cleanup() {
+    processedPipelineIDs.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 1676af1..f66dc29 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -65,6 +65,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test DeadNodeHandler.
  */
@@ -86,6 +88,7 @@ public class TestDeadNodeHandler {
     storageDir = GenericTestUtils.getTempPath(
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..1e34039 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test for PipelinePlacementPolicy.
  */
 public class TestPipelinePlacementPolicy {
   private MockNodeManager nodeManager;
+  private OzoneConfiguration conf;
   private PipelinePlacementPolicy placementPolicy;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
 
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true,
         PIPELINE_PLACEMENT_MAX_NODES_COUNT);
-    placementPolicy =
-        new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+    conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+    placementPolicy = new PipelinePlacementPolicy(
+        nodeManager, new PipelineStateManager(conf), conf);
   }
 
   @Test
@@ -123,7 +128,7 @@ public class TestPipelinePlacementPolicy {
   public void testHeavyNodeShouldBeExcluded() throws SCMException{
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    int nodesRequired = healthyNodes.size()/2;
+    int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
     // only minority of healthy NODES are heavily engaged in pipelines.
     int minorityHeavy = healthyNodes.size()/2 - 1;
     List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
     }
 
     int considerHeavyCount =
-        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+        conf.getInt(
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
 
     Node2PipelineMap mockMap = new Node2PipelineMap();
     for (DatanodeDetails node : nodes) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 81723e1..08f5185 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
@@ -34,12 +35,13 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +61,7 @@ public class TestSCMPipelineManager {
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,10 @@ public class TestSCMPipelineManager {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
       Assert.fail();
-    } catch (InsufficientDatanodesException idEx) {
-      Assert.assertEquals(
-          "Cannot create pipeline of factor 3 using 1 nodes.",
-          idEx.getMessage());
+    } catch (SCMException ioe) {
+      // pipeline creation failed this time.
+      Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          ioe.getResult());
     }
 
     metrics = getMetrics(
@@ -266,7 +269,7 @@ public class TestSCMPipelineManager {
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
-    Assert.assertTrue(numPipelineCreateFailed == 0);
+    Assert.assertTrue(numPipelineCreateFailed == 1);
     
     // clean up
     pipelineManager.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 21fa7bd..aba9cae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -170,6 +170,7 @@ public class TestPipelineClose {
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
     Thread.sleep(5000);
+
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 2f852f2..0874f8b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
@@ -49,9 +51,12 @@ public class TestRatisPipelineCreateAndDestroy {
   public void init(int numDatanodes) throws Exception {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         GenericTestUtils.getRandomizedTempPath());
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
             .setNumDatanodes(numDatanodes)
-            .setHbInterval(1000)
+            .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+            .setHbInterval(2000)
             .setHbProcessorInterval(1000)
             .build();
     cluster.waitForClusterToBeReady();
@@ -132,7 +137,9 @@ public class TestRatisPipelineCreateAndDestroy {
     } catch (IOException ioe) {
       // As now all datanodes are shutdown, they move to stale state, there
       // will be no sufficient datanodes to create the pipeline.
-      Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+      Assert.assertTrue(ioe instanceof SCMException);
+      Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          ((SCMException) ioe).getResult());
     }
 
     // make sure pipelines is destroyed
@@ -145,9 +152,14 @@ public class TestRatisPipelineCreateAndDestroy {
     for (Pipeline pipeline : pipelines) {
       pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
-    // make sure pipelines is created after node start
-    pipelineManager.triggerPipelineCreation();
-    waitForPipelines(1);
+
+    if (cluster.getStorageContainerManager()
+        .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) >=
+        HddsProtos.ReplicationFactor.THREE.getNumber()) {
+      // make sure pipelines is created after node start
+      pipelineManager.triggerPipelineCreation();
+      waitForPipelines(1);
+    }
   }
 
   private void waitForPipelines(int numPipelines)
@@ -155,6 +167,6 @@ public class TestRatisPipelineCreateAndDestroy {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() == numPipelines, 100, 40000);
+        .size() >= numPipelines, 100, 40000);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 8b8c64f..8cf9acb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -34,6 +33,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test for RatisPipelineProvider.
  */
@@ -48,15 +50,17 @@ public class TestRatisPipelineProvider {
     nodeManager = new MockNodeManager(true, 10);
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     stateManager = new PipelineStateManager(conf);
     provider = new MockRatisPipelineProvider(nodeManager,
-        stateManager, new OzoneConfiguration());
+        stateManager, conf);
   }
 
   private void createPipelineAndAssertions(
           HddsProtos.ReplicationFactor factor) throws IOException {
     Pipeline pipeline = provider.create(factor);
     stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
@@ -64,10 +68,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
-    // New pipeline should not overlap with the previous created pipeline
-    Assert.assertTrue(
-        CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
-            .isEmpty());
+    nodeManager.addPipeline(pipeline1);
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
@@ -80,6 +81,7 @@ public class TestRatisPipelineProvider {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline = provider.create(factor);
     stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
@@ -89,11 +91,7 @@ public class TestRatisPipelineProvider {
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
-    // New pipeline should overlap with the previous created pipeline,
-    // and one datanode should overlap between the two types.
-    Assert.assertEquals(
-        CollectionUtils.intersection(pipeline.getNodes(),
-            pipeline1.getNodes()).size(), 1);
+    nodeManager.addPipeline(pipeline1);
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
@@ -141,54 +139,39 @@ public class TestRatisPipelineProvider {
 
   @Test
   public void testCreatePipelinesDnExclude() throws IOException {
+    List<DatanodeDetails> allHealthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    int totalHealthyNodesCount = allHealthyNodes.size();
 
-    // We need 9 Healthy DNs in MockNodeManager.
-    NodeManager mockNodeManager = new MockNodeManager(true, 12);
-    PipelineStateManager stateManagerMock =
-        new PipelineStateManager(new OzoneConfiguration());
-    PipelineProvider providerMock = new MockRatisPipelineProvider(
-        mockNodeManager, stateManagerMock, new OzoneConfiguration());
-
-    // Use up first 3 DNs for an open pipeline.
-    List<DatanodeDetails> openPiplineDns = mockNodeManager.getAllNodes()
-        .subList(0, 3);
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
 
-    Pipeline openPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(openPiplineDns)
-        .setState(Pipeline.PipelineState.OPEN)
-        .setId(PipelineID.randomId())
-        .build();
-
-    stateManagerMock.addPipeline(openPipeline);
-
-    // Use up next 3 DNs also for an open pipeline.
-    List<DatanodeDetails> moreOpenPiplineDns = mockNodeManager.getAllNodes()
-        .subList(3, 6);
-    Pipeline anotherOpenPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(moreOpenPiplineDns)
-        .setState(Pipeline.PipelineState.OPEN)
-        .setId(PipelineID.randomId())
-        .build();
-    stateManagerMock.addPipeline(anotherOpenPipeline);
-
-    // Use up next 3 DNs also for a closed pipeline.
-    List<DatanodeDetails> closedPiplineDns = mockNodeManager.getAllNodes()
-        .subList(6, 9);
-    Pipeline anotherClosedPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(closedPiplineDns)
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .build();
-    stateManagerMock.addPipeline(anotherClosedPipeline);
-
-    Pipeline pipeline = providerMock.create(factor);
+    List<DatanodeDetails> closePipelineDns = new ArrayList<>();
+    for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+      List<DatanodeDetails> pipelineDns = allHealthyNodes
+          .subList(3 * i, 3 * (i + 1));
+
+      Pipeline.PipelineState state;
+      if (i % 2 == 0) {
+        state = Pipeline.PipelineState.OPEN;
+      } else {
+        state = Pipeline.PipelineState.CLOSED;
+        closePipelineDns.addAll(pipelineDns);
+      }
+
+      Pipeline openPipeline = Pipeline.newBuilder()
+          .setType(HddsProtos.ReplicationType.RATIS)
+          .setFactor(factor)
+          .setNodes(pipelineDns)
+          .setState(state)
+          .setId(PipelineID.randomId())
+          .build();
+
+
+      stateManager.addPipeline(openPipeline);
+      nodeManager.addPipeline(openPipeline);
+    }
+
+    Pipeline pipeline = provider.create(factor);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
@@ -196,15 +179,9 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
 
-    // Pipline nodes cannot be from open pipelines.
-    Assert.assertTrue(
-        pipelineNodes.parallelStream().filter(dn ->
-        (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
-        .count() == 0);
-
-    // Since we have only 9 Healthy DNs, at least 1 pipeline node should have
-    // been from the closed pipeline DN list.
+    // Since we have only 10 DNs, at least 1 pipeline node should have been
+    // from the closed pipeline DN list.
     Assert.assertTrue(pipelineNodes.parallelStream().filter(
-        closedPiplineDns::contains).count() > 0);
+        closePipelineDns::contains).count() > 0);
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67a..baeee6a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    int numOfNodes = 4;
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(4)
+        .setNumDatanodes(numOfNodes)
+        // allow only one FACTOR THREE pipeline.
+        .setTotalPipelineNumLimit(numOfNodes + 1)
         .setHbInterval(1000)
         .setHbProcessorInterval(1000)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555..09c633d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.junit.Assert.fail;
 
 /**
@@ -62,6 +63,8 @@ public class TestSCMSafeModeWithPipelineRules {
         true);
     conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 50);
+
     clusterBuilder = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numDatanodes)
         .setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0102246..1070888 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -238,6 +238,7 @@ public interface MiniOzoneCluster {
     protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
     protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
     protected static final int ACTIVE_OMS_NOT_SET = -1;
+    protected static final int DEFAULT_PIPELIME_LIMIT = 3;
 
     protected final OzoneConfiguration conf;
     protected String path;
@@ -265,6 +266,7 @@ public interface MiniOzoneCluster {
     protected int numOfDatanodes = 3;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
+    protected int pipelineNumLimit = DEFAULT_PIPELIME_LIMIT;
 
     protected Builder(OzoneConfiguration conf) {
       this.conf = conf;
@@ -352,6 +354,16 @@ public interface MiniOzoneCluster {
     }
 
     /**
+     * Sets the total number of pipelines to create.
+     * @param val number of pipelines
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setTotalPipelineNumLimit(int val) {
+      pipelineNumLimit = val;
+      return this;
+    }
+
+    /**
      * Sets the number of HeartBeat Interval of Datanodes, the value should be
      * in MilliSeconds.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2ad6c12..bc937aa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -513,6 +513,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           streamBufferMaxSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
           streamBufferSizeUnit.get());
+      // MiniOzoneCluster should have global pipeline upper limit.
+      conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+          pipelineNumLimit == DEFAULT_PIPELIME_LIMIT ?
+              2 * numOfDatanodes : pipelineNumLimit);
       configureTrace();
     }
 
@@ -524,7 +528,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
      * Creates a new StorageContainerManager instance.
      *
      * @return {@link StorageContainerManager}
-     *
+     *Wa
      * @throws IOException
      */
     StorageContainerManager createSCM()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fd2cea3..1eef382 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@ public class Test2WayCommitInRatis {
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 2b41012..2584499 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -85,6 +85,7 @@ public class TestBlockOutputStream {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 3cbe06d..a4c0ef4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -90,9 +91,11 @@ public class TestBlockOutputStreamWithFailures {
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
-        .setBlockSize(blockSize).setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
+        .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
+        .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
         .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 344c51e..d76155b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@ public class TestCommitWatcher {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index bc27926..1f2a6b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -56,6 +56,7 @@ import java.util.function.Predicate;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 
 /**
  * Tests delete key operation with a slow follower in the datanode
@@ -99,10 +100,12 @@ public class TestContainerReplicationEndToEnd {
         1000, TimeUnit.SECONDS);
     conf.setLong("hdds.scm.replication.thread.interval",
         containerReportInterval);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+            .setTotalPipelineNumLimit(6).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
     cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 2e862bf..ebf9b65 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@ public class TestContainerStateMachine {
     baseDir.mkdirs();
 
     conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
-  //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624..644469e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower {
   private static String bucketName;
   private static String path;
   private static XceiverClientManager xceiverClientManager;
+  private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -111,10 +112,13 @@ public class TestDeleteWithSlowFollower {
         1000, TimeUnit.SECONDS);
     conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
         1, TimeUnit.SECONDS);
-
     conf.setQuietMode(false);
-    cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+    int numOfDatanodes = 3;
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(numOfDatanodes)
+            .setTotalPipelineNumLimit(
+                numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+            .setHbInterval(100)
             .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -176,7 +180,7 @@ public class TestDeleteWithSlowFollower {
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipelines(HddsProtos.ReplicationType.RATIS,
                 HddsProtos.ReplicationFactor.THREE);
-    Assert.assertTrue(pipelineList.size() == 1);
+    Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT);
     Pipeline pipeline = pipelineList.get(0);
     for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
       if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index a7f5960..d86ae51 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -98,6 +99,7 @@ public class TestFailureHandlingByClient {
         1, TimeUnit.SECONDS);
     conf.setBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -106,7 +108,7 @@ public class TestFailureHandlingByClient {
         Collections.singleton(HddsUtils.getHostName(conf))).get(0),
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10).build();
+        .setNumDatanodes(10).setTotalPipelineNumLimit(15).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e..75af061 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+        .setTotalPipelineNumLimit(5).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index d834350..505b46c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -83,6 +83,7 @@ public class TestKeyInputStream {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setTotalPipelineNumLimit(5)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 5717f58..b60fe3f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -48,8 +48,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -88,10 +87,13 @@ public class TestMultiBlockWritesWithDnFailures {
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(datanodes).build();
+        .setNumDatanodes(datanodes)
+        .setTotalPipelineNumLimit(0)
+        .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 6758f4f..7f12edd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -92,6 +92,7 @@ public class TestOzoneClientRetriesOnException {
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 2163773..fd2800f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -165,6 +165,7 @@ public abstract class TestOzoneRpcClientAbstract {
   static void startCluster(OzoneConfiguration conf) throws Exception {
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setTotalPipelineNumLimit(10)
         .setScmId(scmId)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 5808655..511315f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeoutException;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 
 /**
  * This class verifies the watchForCommit Handling by xceiverClient.
@@ -93,10 +94,12 @@ public class TestWatchForCommit {
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c..3cafb7d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test container closing.
  */
@@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline {
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(10)
+        .setTotalPipelineNumLimit(15)
         .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807..e677544 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(4)
+        .setTotalPipelineNumLimit(10)
         .build();
     cluster.waitForClusterToBeReady();
     metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index ecc2b3e..a882dcd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -76,9 +77,11 @@ public class TestQueryNode {
     conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numOfDatanodes)
+        .setTotalPipelineNumLimit(numOfDatanodes + numOfDatanodes/2)
         .build();
     cluster.waitForClusterToBeReady();
     scmClient = new ContainerOperationClient(conf);
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index fcabc67..b3bbe3b 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -97,6 +97,7 @@ public class TestOzoneFsHAURLs {
     conf.setTimeDuration(
         OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     OMStorage omStore = new OMStorage(conf);
     omStore.setClusterId(clusterId);
@@ -106,6 +107,8 @@ public class TestOzoneFsHAURLs {
 
     // Start the cluster
     cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setClusterId(clusterId)
         .setScmId(scmId)
         .setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822..3e1c826 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@ public abstract class TestDataValidate {
   static void startCluster(OzoneConfiguration conf) throws Exception {
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(5).build();
+        .setNumDatanodes(5).setTotalPipelineNumLimit(8).build();
     cluster.waitForClusterToBeReady();
   }
 
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab6..bd30d4e 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@ public class TestFreonWithPipelineDestroy {
       .setHbProcessorInterval(1000)
       .setHbInterval(1000)
       .setNumDatanodes(3)
+      .setTotalPipelineNumLimit(8)
       .build();
     cluster.waitForClusterToBeReady();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message