hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [hadoop-ozone] 01/02: Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""
Date Wed, 20 Nov 2019 09:02:08 GMT
This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2531
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 3e7e21244c9a0bf249c976f20ae4e559acd1013d
Author: Márton Elek <elek@apache.org>
AuthorDate: Wed Nov 20 10:00:11 2019 +0100

    Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""
    
    This reverts commit dcfe5f34d79473def14f25812d77c6c685b92e58.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   5 +
 .../org/apache/hadoop/hdds/utils/Scheduler.java    |   7 +-
 .../common/src/main/resources/ozone-default.xml    |  30 ++-
 .../common/statemachine/DatanodeStateMachine.java  |   6 +
 .../CloseContainerCommandHandler.java              |  11 +-
 .../ClosePipelineCommandHandler.java               | 120 ++++++++++++
 .../commandhandler/CommandHandler.java             |   2 +-
 .../CreatePipelineCommandHandler.java              | 135 +++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  22 +++
 .../common/transport/server/XceiverServerSpi.java  |  18 ++
 .../transport/server/ratis/XceiverServerRatis.java |  36 ++++
 .../protocol/commands/ClosePipelineCommand.java    |  73 +++++++
 .../protocol/commands/CreatePipelineCommand.java   | 100 ++++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  23 +++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   3 +
 .../hdds/scm/container/ContainerStateManager.java  |   1 +
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  12 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   9 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  13 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  15 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  54 +++---
 .../hdds/scm/pipeline/PipelineStateManager.java    |   4 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 137 +++++---------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 103 ----------
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  82 ++++++--
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |  10 +
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  83 ++++----
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  65 +++----
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  16 +-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   1 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  20 ++
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  12 ++
 .../hadoop/hdds/scm/block/TestBlockManager.java    |  44 ++++-
 .../container/TestCloseContainerEventHandler.java  |  11 +-
 .../scm/container/TestSCMContainerManager.java     |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   2 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  11 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   9 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |  10 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  26 ++-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  44 ++---
 .../TestOneReplicaPipelineSafeModeRule.java        |  36 +---
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  50 ++---
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  35 ++++
 .../TestContainerStateManagerIntegration.java      |   6 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   3 +
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    | 210 +++++++++++++++++++++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  93 +++++++--
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   2 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  43 ++---
 .../hadoop/ozone/TestContainerOperations.java      |   2 +-
 .../TestContainerStateMachineIdempotency.java      |   2 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |   6 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |  18 +-
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   3 +
 .../client/rpc/TestContainerStateMachine.java      |   3 +
 .../rpc/TestContainerStateMachineFailures.java     |   9 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../commandhandler/TestBlockDeletion.java          |   6 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +
 .../commandhandler/TestDeleteContainerHandler.java |   2 +
 .../hadoop/ozone/dn/scrubber/TestDataScrubber.java |   2 +
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |   2 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |   2 +-
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |   2 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   2 +-
 .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java  |   4 +-
 .../hadoop/fs/ozone/TestOzoneFsRenameDir.java      |   2 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |   2 +-
 76 files changed, 1403 insertions(+), 570 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
   public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
       "hdds.scm.safemode.pipeline-availability.check";
   public static final boolean
-      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+  public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+      "hdds.scm.safemode.pipeline.creation";
+  public static final boolean
+      HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
 
   // % of containers which should have at least one reported replica
   // before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
       "hdds.scm.safemode.threshold.pct";
   public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
-
   // percentage of healthy pipelines, where all 3 datanodes are reported in the
   // pipeline.
   public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
       "hdds.scm.safemode.healthy.pipelie.pct";
   public static final double
       HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+  // number of healthy RATIS pipeline(ONE or THREE factor)
+  public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+      "hdds.scm.safemode.min.pipeline";
+  public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
 
   public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
       "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 47ec453..51598a7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -155,6 +155,11 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public boolean isAllocationTimeout() {
+    //TODO: define a system property to control the timeout value
+    return false;
+  }
+
   public void setNodesInOrder(List<DatanodeDetails> nodes) {
     nodesInOrder.set(nodes);
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
index 9edc104..f5e55c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -72,9 +73,9 @@ public class Scheduler {
     }, delay, timeUnit);
   }
 
-  public void scheduleWithFixedDelay(Runnable runnable, long initialDelay,
-      long fixedDelay, TimeUnit timeUnit) {
-    scheduler
+  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
+      long initialDelay, long fixedDelay, TimeUnit timeUnit) {
+    return scheduler
         .scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
   }
 
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2f9ce31..c20a6a4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -319,15 +319,6 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
-    <name>hdds.command.status.report.interval</name>
-    <value>60000ms</value>
-    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
-    <description>Time interval of the datanode to send status of command
-      execution. Each datanode periodically the execution status of commands
-      received from SCM to SCM. Unit could be defined with postfix
-      (ns,ms,s,m,h,d)</description>
-  </property>
-  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1300,7 +1291,7 @@
 
   <property>
     <name>hdds.scm.safemode.pipeline-availability.check</name>
-    <value>false</value>
+    <value>true</value>
     <tag>HDDS,SCM,OPERATION</tag>
     <description>
       Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1386,6 +1377,25 @@
   </property>
 
   <property>
+    <name>hdds.scm.safemode.pipeline.creation</name>
+    <value>true</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Boolean value to enable background pipeline creation in SCM safe mode.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.scm.safemode.min.pipeline</name>
+    <value>1</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+      "hdds.scm.safemode.pipeline.creation" is True.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.lock.max.concurrency</name>
     <value>100</value>
     <tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5424b6b..e7fda00 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
@@ -132,6 +136,8 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .addHandler(new DeleteContainerCommandHandler(
             dnConf.getContainerDeleteThreads()))
+        .addHandler(new ClosePipelineCommandHandler())
+        .addHandler(new CreatePipelineCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index ef06c14..3c4e24a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private int invocationCount;
+  private AtomicLong invocationCount = new AtomicLong(0);
   private long totalTime;
 
   /**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
-    invocationCount++;
+    invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails datanodeDetails = context.getParent()
         .getDatanodeDetails();
@@ -162,7 +163,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public int getInvocationCount() {
-    return invocationCount;
+    return (int)invocationCount.get();
   }
 
   /**
@@ -172,8 +173,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
     }
     return 0;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..b1c6090
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+    final ClosePipelineCommandProto closeCommand =
+        ((ClosePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.removeGroup(pipelineID);
+      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+          dn.getUuidString());
+    } catch (IOException e) {
+      LOG.error("Can't close pipeline #{}", pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 70ed9ca..3a6566f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
   default void updateCommandStatus(StateContext context, SCMCommand command,
       Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
     if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
+      log.warn("{} with Id:{} not found.", command.getType(),
           command.getId());
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..3a60d7e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a createPipelineCommand handler.
+   */
+  public CreatePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent()
+        .getDatanodeDetails();
+    final CreatePipelineCommandProto createCommand =
+        ((CreatePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
+    Collection<DatanodeDetails> peers =
+        createCommand.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.addGroup(pipelineID, peers);
+      LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+          createCommand.getType(), createCommand.getFactor(), pipelineID,
+          dn.getUuidString());
+      // Trigger heartbeat report
+      context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.getParent().triggerHeartbeat();
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+    } catch (IOException e) {
+      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+          createCommand.getFactor(), pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(deleteContainerCommand);
         break;
+      case createPipelineCommand:
+        CreatePipelineCommand createPipelineCommand =
+            CreatePipelineCommand.getFromProtobuf(
+                commandResponseProto.getCreatePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM create pipeline request {}",
+              createPipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(createPipelineCommand);
+        break;
+      case closePipelineCommand:
+        ClosePipelineCommand closePipelineCommand =
+            ClosePipelineCommand.getFromProtobuf(
+                commandResponseProto.getClosePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM close pipeline request {}",
+              closePipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(closePipelineCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 4e0d343..01f463c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -60,6 +62,22 @@ public interface XceiverServerSpi {
    */
   boolean isExist(HddsProtos.PipelineID pipelineId);
 
+
+  /**
+   * Join a new pipeline.
+   */
+  default void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+  }
+
+
+  /**
+   * Exit a pipeline.
+   */
+  default void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+  }
+
   /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 1146394..a76944b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -620,6 +621,41 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     return pipelineIDs;
   }
 
+  @Override
+  public void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
+    final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+    GroupManagementRequest request = GroupManagementRequest.newAdd(
+        clientId, server.getId(), nextCallId(), group);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
+  @Override
+  public void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+    GroupManagementRequest request = GroupManagementRequest.newRemove(
+        clientId, server.getId(), nextCallId(),
+        RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
+        true);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
   void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(groupId, roleInfoProto);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+    extends SCMCommand<ClosePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+
+  public ClosePipelineCommand(final PipelineID pipelineID) {
+    super();
+    this.pipelineID = pipelineID;
+  }
+
+  public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  @Override
+  public ClosePipelineCommandProto getProto() {
+    ClosePipelineCommandProto.Builder builder =
+        ClosePipelineCommandProto.newBuilder();
+    builder.setCmdId(getId());
+    builder.setPipelineID(pipelineID.getProtobuf());
+    return builder.build();
+  }
+
+  public static ClosePipelineCommand getFromProtobuf(
+      ClosePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+    extends SCMCommand<CreatePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+  private final ReplicationFactor factor;
+  private final ReplicationType type;
+  private final List<DatanodeDetails> nodelist;
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  @Override
+  public CreatePipelineCommandProto getProto() {
+    return CreatePipelineCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setPipelineID(pipelineID.getProtobuf())
+        .setFactor(factor)
+        .setType(type)
+        .addAllDatanode(nodelist.stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static CreatePipelineCommand getFromProtobuf(
+      CreatePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+        createPipelineProto.getType(), createPipelineProto.getFactor(),
+        createPipelineProto.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 45a1db6..8b272c8 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -283,6 +283,8 @@ message SCMCommandProto {
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
     replicateContainerCommand = 5;
+    createPipelineCommand = 6;
+    closePipelineCommand = 7;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -291,6 +293,8 @@ message SCMCommandProto {
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+  optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+  optional ClosePipelineCommandProto closePipelineCommandProto = 8;
 }
 
 /**
@@ -360,6 +364,25 @@ message ReplicateContainerCommandProto {
 }
 
 /**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required ReplicationType type = 2;
+  required ReplicationFactor factor = 3;
+  repeated DatanodeDetailsProto datanode = 4;
+  required int64 cmdId = 5;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required int64 cmdId = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 845bdf1..b7a7525 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import javax.management.ObjectName;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -196,6 +197,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+          // wait until pipeline is ready
+          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7dde8d7..cefc185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -253,6 +253,7 @@ public class ContainerStateManager {
       // TODO: #CLUTIL remove creation logic when all replication types and
       // factors are handled by pipeline creator job.
       pipeline = pipelineManager.createPipeline(type, replicationFactor);
+      pipelineManager.waitPipelineReady(pipeline.getId(), 0);
     } catch (IOException e) {
       final List<Pipeline> pipelines = pipelineManager
           .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..6de05fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,15 +98,14 @@ public final class SCMEvents {
           new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
 
   /**
-   * PipelineReport processed by pipeline report handler. This event is
+   * Open pipeline event sent by PipelineReportHandler. This event is
    * received by HealthyPipelineSafeModeRule.
    */
-  public static final TypedEvent<PipelineReportFromDatanode>
-      PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
-          PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+  public static final TypedEvent<Pipeline>
+      OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
 
   /**
-   * PipelineActions are sent by Datanode. This event is received by
+   * PipelineActions are sent by Datanode to close a pipeline. It's received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */
   public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +113,7 @@ public final class SCMEvents {
       "Pipeline_Actions");
 
   /**
-   * A Command status report will be sent by datanodes. This repoort is received
+   * A Command status report will be sent by datanodes. This report is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
    */
   public static final TypedEvent<CommandStatusReportFromDatanode>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..4065c2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -41,6 +42,7 @@ class BackgroundPipelineCreator {
   private final AtomicBoolean isPipelineCreatorRunning;
   private final PipelineManager pipelineManager;
   private final Configuration conf;
+  private ScheduledFuture<?> periodicTask;
 
   BackgroundPipelineCreator(PipelineManager pipelineManager,
       Scheduler scheduler, Configuration conf) {
@@ -57,13 +59,16 @@ class BackgroundPipelineCreator {
   /**
    * Schedules a fixed interval job to create pipelines.
    */
-  void startFixedIntervalPipelineCreator() {
+  synchronized void startFixedIntervalPipelineCreator() {
+    if (periodicTask != null) {
+      return;
+    }
     long intervalInMillis = conf
         .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
             ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
             TimeUnit.MILLISECONDS);
     // TODO: #CLUTIL We can start the job asap
-    scheduler.scheduleWithFixedDelay(() -> {
+    periodicTask = scheduler.scheduleWithFixedDelay(() -> {
       if (!shouldSchedulePipelineCreator()) {
         return;
       }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf, GrpcTlsConfig tlsConfig) {
+      Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+        new RatisPipelineProvider(nodeManager, stateManager, conf,
+            eventPublisher));
   }
 
   @VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
     return providers.get(type).create(factor, nodes);
   }
 
+  public void close(ReplicationType type, Pipeline pipeline)
+      throws IOException {
+    providers.get(type).close(pipeline);
+  }
+
   public void shutdown() {
     providers.values().forEach(provider -> provider.shutdown());
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..779008f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -51,6 +50,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
   List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -95,5 +97,14 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
 
-  GrpcTlsConfig getGrpcTlsConfig();
+  /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout(millisecond), if 0, use default timeout
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  default void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
+  void close(Pipeline pipeline) throws IOException;
+
   void shutdown();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index b8cb7b4..a7e2bf1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -19,18 +19,23 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import java.io.IOException;
-import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +55,8 @@ public class PipelineReportHandler implements
   private final boolean pipelineAvailabilityCheck;
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
-      PipelineManager pipelineManager,
-      Configuration conf) {
+      PipelineManager pipelineManager, Configuration conf) {
     Preconditions.checkNotNull(pipelineManager);
-    Objects.requireNonNull(scmSafeModeManager);
     this.scmSafeModeManager = scmSafeModeManager;
     this.pipelineManager = pipelineManager;
     this.conf = conf;
@@ -76,48 +79,45 @@ public class PipelineReportHandler implements
     }
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
       try {
-        processPipelineReport(report, dn);
+        processPipelineReport(report, dn, publisher);
       } catch (IOException e) {
         LOGGER.error("Could not process pipeline report={} from dn={} {}",
             report, dn, e);
       }
     }
-    if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
-      publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          pipelineReportFromDatanode);
-    }
   }
 
-  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
-      throws IOException {
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+      EventPublisher publisher) throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     Pipeline pipeline;
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
-          pipelineManager.getGrpcTlsConfig());
+      final ClosePipelineCommand closeCommand =
+          new ClosePipelineCommand(pipelineID);
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(dn.getUuid(), closeCommand);
+      publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
       return;
     }
 
     pipeline.reportDatanode(dn);
-    if (report.getIsLeader()) {
+    // ONE replica pipeline doesn't have leader flag
+    if (report.getIsLeader() ||
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.ONE) {
       pipeline.setLeaderId(dn.getUuid());
     }
-    if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED)
-        && pipeline.isHealthy()) {
-      pipelineManager.openPipeline(pipelineID);
-    }
 
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-
-
-      if (report.getIsLeader()) {
-        // Pipeline reported as the leader
-        pipeline.setLeaderId(dn.getUuid());
+      LOGGER.info("Pipeline {} {} reported by {}", pipeline.getFactor(),
+          pipeline.getId(), dn);
+      if (pipeline.isHealthy()) {
         pipelineManager.openPipeline(pipelineID);
+        if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+          publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+        }
       }
     }
-    pipeline.reportDatanode(dn);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 2410b54..180d0bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -129,9 +129,9 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
-      pipeline = pipelineStateMap.updatePipelineState(
-          pipelineId, PipelineState.OPEN);
       LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
+      pipeline = pipelineStateMap
+          .updatePipelineState(pipelineId, PipelineState.OPEN);
     }
     return pipeline;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 94443dd..a2ee50a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -69,6 +60,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final EventPublisher eventPublisher;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -83,15 +75,14 @@ public class RatisPipelineProvider implements PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
-      GrpcTlsConfig tlsConfig) {
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    this.tlsConfig = tlsConfig;
+    this.eventPublisher = eventPublisher;
   }
 
 
@@ -153,8 +144,27 @@ public class RatisPipelineProvider implements PipelineProvider {
       throw new InsufficientDatanodesException(e);
     }
 
-    Pipeline pipeline = create(factor, dns);
-    initializePipeline(pipeline);
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(PipelineState.ALLOCATED)
+        .setType(ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(dns)
+        .build();
+
+    // Send command to datanodes to create pipeline
+    final CreatePipelineCommand createCommand =
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns);
+
+    dns.stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), createCommand);
+      LOG.info("Send pipeline:{} create command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
+
     return pipeline;
   }
 
@@ -181,69 +191,22 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-  protected void initializePipeline(Pipeline pipeline) throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    }
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> {
-          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
-          if (reply == null || !reply.isSuccess()) {
-            String msg = "Pipeline initialization failed for pipeline:"
-                + pipeline.getId() + " node:" + peer.getId();
-            LOG.error(msg);
-            throw new IOException(msg);
-          }
-        });
-  }
-
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final String rpcType = conf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
-    final List< IOException > exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(conf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(conf);
-    try {
-      forkJoinPool.submit(() -> {
-        datanodes.parallelStream().forEach(d -> {
-          final RaftPeer p = RatisHelper.toRaftPeer(d);
-          try (RaftClient client = RatisHelper
-              .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-                  retryPolicy, maxOutstandingRequests, tlsConfig,
-                  requestTimeout)) {
-            rpc.accept(client, p);
-          } catch (IOException ioe) {
-            String errMsg =
-                "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
-            LOG.error(errMsg, ioe);
-            exceptions.add(new IOException(errMsg, ioe));
-          }
-        });
-      }).get();
-    } catch (ExecutionException | RejectedExecutionException ex) {
-      LOG.error(ex.getClass().getName() + " exception occurred during " +
-          "createPipeline", ex);
-      throw new IOException(ex.getClass().getName() + " exception occurred " +
-          "during createPipeline", ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupt exception occurred during " +
-          "createPipeline", ex);
-    }
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
+  /**
+   * Removes pipeline from SCM. Sends command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  public void close(Pipeline pipeline) {
+    final ClosePipelineCommand closeCommand =
+        new ClosePipelineCommand(pipeline.getId());
+    pipeline.getNodes().stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), closeCommand);
+      LOG.info("Send pipeline:{} close command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 497e717..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisPipelineUtils.class);
-
-  private RatisPipelineUtils() {
-  }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    }
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(ozoneConf);
-    try(RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
-            requestTimeout)) {
-      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
-          true, p.getId());
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..00a4429 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,18 +82,18 @@ public class SCMPipelineManager implements PipelineManager {
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
   private final Configuration conf;
+  private long pipelineWaitDefaultTimeout;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
-  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      EventPublisher eventPublisher)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf, grpcTlsConfig);
+        conf, eventPublisher);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -113,8 +114,11 @@ public class SCMPipelineManager implements PipelineManager {
     this.metrics = SCMPipelineMetrics.create();
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
+    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     initializePipelineState();
-    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -148,8 +152,8 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public synchronized Pipeline createPipeline(
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline createPipeline(ReplicationType type,
+      ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +161,11 @@ public class SCMPipelineManager implements PipelineManager {
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
+      metrics.incNumPipelineAllocated();
+      if (pipeline.isOpen()) {
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
       return pipeline;
     } catch (InsufficientDatanodesException idEx) {
       throw idEx;
@@ -225,6 +232,16 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  public List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,6 +310,7 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.openPipeline(pipelineId);
+      metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
     } finally {
       lock.writeLock().unlock();
@@ -380,6 +398,45 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout, millisecond, 0 to use default value
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  @Override
+  public void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+    long st = Time.monotonicNow();
+    if (timeout == 0) {
+      timeout = pipelineWaitDefaultTimeout;
+    }
+
+    boolean ready;
+    Pipeline pipeline;
+    do {
+      try {
+        pipeline = stateManager.getPipeline(pipelineID);
+      } catch (PipelineNotFoundException e) {
+        throw new PipelineNotFoundException(String.format(
+            "Pipeline %s cannot be found", pipelineID));
+      }
+      ready = pipeline.isOpen();
+      if (!ready) {
+        try {
+          Thread.sleep((long)100);
+        } catch (InterruptedException e) {
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
+
+    if (!ready) {
+      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+          pipelineID, timeout));
+    }
+  }
+
+  /**
    * Moves the pipeline to CLOSED state and sends close container command for
    * all the containers in the pipeline.
    *
@@ -408,7 +465,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+    pipelineFactory.close(pipeline.getType(), pipeline);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -441,11 +498,6 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public GrpcTlsConfig getGrpcTlsConfig() {
-    return grpcTlsConfig;
-  }
-
-  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index b6a1445..40a6f29 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -47,6 +47,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
 
   private MetricsRegistry registry;
 
+  private @Metric MutableCounterLong numPipelineAllocated;
   private @Metric MutableCounterLong numPipelineCreated;
   private @Metric MutableCounterLong numPipelineCreationFailed;
   private @Metric MutableCounterLong numPipelineDestroyed;
@@ -84,6 +85,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   @SuppressWarnings("SuspiciousMethodCalls")
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+    numPipelineAllocated.snapshot(recordBuilder, true);
     numPipelineCreated.snapshot(recordBuilder, true);
     numPipelineCreationFailed.snapshot(recordBuilder, true);
     numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -118,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Increments number of pipeline allocation count, including succeeded
+   * and failed.
+   */
+  void incNumPipelineAllocated() {
+    numPipelineAllocated.incr();
+  }
+
+  /**
    * Increments number of successful pipeline creation count.
    */
   void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..00cb7ae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
   }
 
   @Override
+  public void close(Pipeline pipeline) throws IOException {
+
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 2f9a66f..9b19acf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,27 +17,19 @@
  */
 package org.apache.hadoop.hdds.scm.safemode;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 /**
  * Class defining Safe mode exit criteria for Pipelines.
@@ -47,43 +39,55 @@ import com.google.common.base.Preconditions;
  * through in a cluster.
  */
 public class HealthyPipelineSafeModeRule
-    extends SafeModeExitRule<PipelineReportFromDatanode>{
+    extends SafeModeExitRule<Pipeline>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
-  private final PipelineManager pipelineManager;
   private int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Map<PipelineID, Boolean> processedPipelines = new HashMap<>();
   private final double healthyPipelinesPercent;
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager manager, Configuration configuration) {
     super(manager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
     healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    int minHealthyPipelines = 0;
+
+    boolean createPipelineInSafemode = configuration.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    if (createPipelineInSafemode) {
+      minHealthyPipelines =
+          configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+              HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+    }
+
     Preconditions.checkArgument(
         (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
         HddsConfigKeys.
             HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
             + " value should be >= 0.0 and <= 1.0");
 
-    // As we want to wait for 3 node pipelines
-    int pipelineCount =
+    // We want to wait for RATIS THREE factor write pipelines
+    int pipelineCount = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
+        Pipeline.PipelineState.OPEN).size() +
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.ALLOCATED).size();
 
     // This value will be zero when pipeline count is 0.
     // On a fresh installed cluster, there will be zero pipelines in the SCM
     // pipeline DB.
-    healthyPipelineThresholdCount =
-        (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
 
     LOG.info(" Total pipeline count is {}, healthy pipeline " +
         "threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -99,8 +103,8 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -112,38 +116,18 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
+  protected void process(Pipeline pipeline) {
 
     // When SCM is in safe mode for long time, already registered
-    // datanode can send pipeline report again, then pipeline handler fires
-    // processed report event, we should not consider this pipeline report
-    // from datanode again during threshold calculation.
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID.getFromProtobuf(
-          report.getPipelineID());
-      Pipeline pipeline;
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (!processedPipelines.containsKey(pipelineID)) {
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            report.getIsLeader()) {
-          // If the pipeline gets reported with a leader we mark it as healthy
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-          processedPipelines.put(pipelineID, Boolean.TRUE);
-        }
-      }
+    // datanode can send pipeline report again, or SCMPipelineManager will
+    // create new pipelines.
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
+      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+      currentHealthyPipelineCount++;
     }
+
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
           "SCM in safe mode. Healthy pipelines reported count is {}, " +
@@ -154,7 +138,6 @@ public class HealthyPipelineSafeModeRule
 
   @Override
   protected void cleanup() {
-    processedPipelines.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..0783d02 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
-    PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -40,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This rule covers whether we have at least one datanode is reported for each
@@ -47,14 +41,14 @@ import java.util.Set;
  * replica available for read when we exit safe mode.
  */
 public class OneReplicaPipelineSafeModeRule extends
-    SafeModeExitRule<PipelineReportFromDatanode> {
+    SafeModeExitRule<Pipeline> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
 
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
-  private final PipelineManager pipelineManager;
+  private Set<PipelineID> oldPipelineIDSet;
   private int currentReportedPipelineCount = 0;
 
 
@@ -62,7 +56,6 @@ public class OneReplicaPipelineSafeModeRule extends
       PipelineManager pipelineManager,
       SCMSafeModeManager safeModeManager, Configuration configuration) {
     super(safeModeManager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
 
     double percent =
         configuration.getDouble(
@@ -75,24 +68,25 @@ public class OneReplicaPipelineSafeModeRule extends
             HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    int totalPipelineCount =
-        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+    oldPipelineIDSet = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE)
+        .stream().map(p -> p.getId()).collect(Collectors.toSet());
+    int totalPipelineCount = oldPipelineIDSet.size();
 
     thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
 
-    LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+    LOG.info("Total pipeline count is {}, pipeline's with at least one " +
         "datanode reported threshold count is {}", totalPipelineCount,
         thresholdCount);
 
     getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
         thresholdCount);
-
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -104,40 +98,26 @@ public class OneReplicaPipelineSafeModeRule extends
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          !reportedPipelineIDSet.contains(pipelineID)) {
-        reportedPipelineIDSet.add(pipelineID);
+  protected void process(Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+        !reportedPipelineIDSet.contains(pipeline.getId())) {
+      if (oldPipelineIDSet.contains(pipeline.getId())) {
         getSafeModeMetrics()
             .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        currentReportedPipelineCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
       }
     }
 
-    currentReportedPipelineCount = reportedPipelineIDSet.size();
-
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. Pipelines with atleast one datanode reported " +
-              "count is {}, required atleast one datanode reported per " +
+          "SCM in safe mode. Pipelines with at least one datanode reported " +
+              "count is {}, required at least one datanode reported per " +
               "pipeline count is {}",
           currentReportedPipelineCount, thresholdCount);
     }
-
   }
 
   @Override
@@ -154,5 +134,4 @@ public class OneReplicaPipelineSafeModeRule extends
   public int getCurrentReportedPipelineCount() {
     return currentReportedPipelineCount;
   }
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..1e83bc4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory;
  * number of datanode registered is met or not.
  *
  * 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
  * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and check if pipeline is healthy
  * and increments current healthy pipeline count. Then validate it cutoff
  * threshold for healthy pipeline is met or not.
  *
  * 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
  * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and add the reported pipeline to
  * reported pipeline set. Then validate it cutoff threshold for one replica
  * per pipeline is met or not.
@@ -135,6 +135,13 @@ public class SCMSafeModeManager {
             oneReplicaPipelineSafeModeRule);
       }
       emitSafeModeStatus();
+      boolean createPipelineInSafemode = conf.getBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+      if (createPipelineInSafemode) {
+        pipelineManager.startPipelineCreator();
+      }
     } else {
       this.safeModeMetrics = null;
       exitSafeMode(eventQueue);
@@ -166,6 +173,7 @@ public class SCMSafeModeManager {
 
     if (exitRules.get(ruleName) != null) {
       validatedRules.add(ruleName);
+      LOG.info("{} rule is successfully validated", ruleName);
     } else {
       // This should never happen
       LOG.error("No Such Exit rule {}", ruleName);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index 44d1c94..2fbe893 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -128,7 +128,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
     List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
     pipelineList.forEach((pipeline) -> {
       try {
-        if (!pipeline.isHealthy()) {
+        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+            pipeline.isAllocationTimeout()) {
           scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
         }
       } catch (IOException ex) {
@@ -141,6 +142,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
-
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       if (heartbeat.getCommandStatusReportsCount() != 0) {
+        LOG.debug("Dispatching Command Status Report.");
         for (CommandStatusReportsProto commandStatusReport : heartbeat
             .getCommandStatusReportsList()) {
           eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 530c0a6..901bc2c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -79,6 +81,12 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .closePipelineCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -329,6 +337,18 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case createPipelineCommand:
+      return builder
+          .setCommandType(createPipelineCommand)
+          .setCreatePipelineCommandProto(
+              ((CreatePipelineCommand)cmd).getProto())
+          .build();
+    case closePipelineCommand:
+      return builder
+          .setCommandType(closePipelineCommand)
+          .setClosePipelineCommandProto(
+              ((ClosePipelineCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 21127f4..16ea094 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -400,8 +400,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
-              grpcTlsConfig);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     }
 
     if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 1cb8376..baeb6dc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -369,6 +371,16 @@ public final class TestUtils {
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
 
+  public static void openAllRatisPipelines(PipelineManager pipelineManager)
+      throws IOException {
+    // Pipeline is created by background thread
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+    // Trigger the processed pipeline report event
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.openPipeline(pipeline.getId());
+    }
+  }
 
   public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
       DatanodeDetails dn, PipelineID... pipelineIDs) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a012d64..aa190f4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -24,11 +24,14 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
@@ -45,9 +48,13 @@ import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -94,14 +101,18 @@ public class TestBlockManager {
 
 
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 5,
+        TimeUnit.SECONDS);
 
     // Override the default Node Manager in SCM with this Mock Node Manager.
     nodeManager = new MockNodeManager(true, 10);
+    eventQueue = new EventQueue();
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
-            pipelineManager.getStateManager(), conf);
+            pipelineManager.getStateManager(), conf, eventQueue);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     SCMConfigurator configurator = new SCMConfigurator();
@@ -112,12 +123,10 @@ public class TestBlockManager {
     // Initialize these fields so that the tests can pass.
     mapping = (SCMContainerManager) scm.getContainerManager();
     blockManager = (BlockManagerImpl) scm.getScmBlockManager();
-
-    eventQueue = new EventQueue();
-    eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
-        scm.getSafeModeHandler());
     eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
         scm.getSafeModeHandler());
+    DatanodeCommandHandler handler = new DatanodeCommandHandler();
+    eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, handler);
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(pipelineManager, mapping);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@@ -136,6 +145,8 @@ public class TestBlockManager {
     GenericTestUtils.waitFor(() -> {
       return !blockManager.isScmInSafeMode();
     }, 10, 1000 * 5);
+    pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
     AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
         type, factor, OzoneConsts.OZONE, new ExcludeList());
     Assert.assertNotNull(block);
@@ -153,6 +164,7 @@ public class TestBlockManager {
       }
     } catch (IOException e) {
     }
+    TestUtils.openAllRatisPipelines(pipelineManager);
     ExcludeList excludeList = new ExcludeList();
     excludeList
         .addPipeline(pipelineManager.getPipelines(type, factor).get(0).getId());
@@ -259,6 +271,7 @@ public class TestBlockManager {
 
     pipelineManager.createPipeline(type, factor);
     pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
 
     AllocatedBlock allocatedBlock = blockManager
         .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
@@ -305,6 +318,7 @@ public class TestBlockManager {
              .getNumber(); i++) {
       pipelineManager.createPipeline(type, factor);
     }
+    TestUtils.openAllRatisPipelines(pipelineManager);
 
     // wait till each pipeline has the configured number of containers.
     // After this each pipeline has numContainerPerOwnerInPipeline containers
@@ -359,7 +373,23 @@ public class TestBlockManager {
     Assert.assertNotNull(blockManager
         .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
             new ExcludeList()));
-    Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
   }
 
+  private class DatanodeCommandHandler implements
+      EventHandler<CommandForDatanode> {
+
+    @Override
+    public void onMessage(final CommandForDatanode command,
+                          final EventPublisher publisher) {
+      final SCMCommandProto.Type commandType = command.getCommand().getType();
+      if (commandType == SCMCommandProto.Type.createPipelineCommand) {
+        CreatePipelineCommand createCommand =
+            (CreatePipelineCommand) command.getCommand();
+        try {
+          pipelineManager.openPipeline(createCommand.getPipelineID());
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index b022fd9..4f503e4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -67,8 +68,9 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
+    eventQueue = new EventQueue();
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
+        new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration);
@@ -77,10 +79,13 @@ public class TestCloseContainerEventHandler {
     containerManager = new
         SCMContainerManager(configuration, nodeManager,
         pipelineManager, new EventQueue());
-    eventQueue = new EventQueue();
+    pipelineManager.triggerPipelineCreation();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+    // Move all pipelines created by background from ALLOCATED to OPEN state
+    Thread.sleep(2000);
+    TestUtils.openAllRatisPipelines(pipelineManager);
   }
 
   @AfterClass
@@ -116,7 +121,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
-
     ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
@@ -134,7 +138,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithRatis() throws IOException {
-
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerInfo container = containerManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 6436af0..fde94d7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -96,7 +96,7 @@ public class TestSCMContainerManager {
     }
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
@@ -147,7 +147,7 @@ public class TestSCMContainerManager {
           containerInfo.getPipelineID()).getFirstNode()
           .getUuid());
     }
-    Assert.assertTrue(pipelineList.size() > 5);
+    Assert.assertTrue(pipelineList.size() >= 1);
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3e4508d..f786060 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -108,7 +108,7 @@ public class TestContainerPlacement {
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
+        new SCMPipelineManager(config, scmNodeManager, eventQueue);
     return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c140119..1676af1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -74,6 +74,7 @@ public class TestDeadNodeHandler {
   private SCMNodeManager nodeManager;
   private ContainerManager containerManager;
   private NodeReportHandler nodeReportHandler;
+  private SCMPipelineManager pipelineManager;
   private DeadNodeHandler deadNodeHandler;
   private EventPublisher publisher;
   private EventQueue eventQueue;
@@ -88,12 +89,12 @@ public class TestDeadNodeHandler {
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
-    SCMPipelineManager manager =
+    pipelineManager =
         (SCMPipelineManager)scm.getPipelineManager();
     PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
-            conf);
-    manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
     deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -149,6 +150,8 @@ public class TestDeadNodeHandler {
     nodeManager.register(TestUtils.randomDatanodeDetails(),
         TestUtils.createNodeReport(storageOne), null);
 
+    TestUtils.openAllRatisPipelines(pipelineManager);
+
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerManager);
     ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index db76d66..f4eb797 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
         testDir.getAbsolutePath());
     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
       eq.processAll(1000L);
       List<SCMCommand> command =
           nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertEquals(1, command.size());
-      Assert
-          .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+      // With dh registered, SCM will send create pipeline command to dn
+      Assert.assertTrue(command.size() >= 1);
+      Assert.assertTrue(command.get(0).getClass().equals(
+          CloseContainerCommand.class) ||
+          command.get(1).getClass().equals(CloseContainerCommand.class));
     } catch (IOException e) {
       e.printStackTrace();
       throw  e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 342ee5b..25b0adc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 
 import java.io.IOException;
 import java.util.List;
@@ -34,7 +36,13 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf, null);
+    super(nodeManager, stateManager, conf, new EventQueue());
+  }
+
+  public MockRatisPipelineProvider(NodeManager nodeManager,
+      PipelineStateManager stateManager, Configuration conf,
+      EventPublisher eventPublisher) {
+    super(nodeManager, stateManager, conf, eventPublisher);
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 6f0425d..065b08b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -61,11 +61,13 @@ public class TestRatisPipelineProvider {
   private void createPipelineAndAssertions(
           HddsProtos.ReplicationFactor factor) throws IOException {
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline);
 
     Pipeline pipeline1 = provider.create(factor);
-    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     // New pipeline should not overlap with the previous created pipeline
     assertTrue(
         intersection(pipeline.getNodes(), pipeline1.getNodes())
@@ -77,12 +79,14 @@ public class TestRatisPipelineProvider {
   public void testCreatePipelineWithFactor() throws IOException {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline);
 
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 = provider.create(factor);
-    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline1);
     // New pipeline should overlap with the previous created pipeline,
     // and one datanode should overlap between the two types.
@@ -113,11 +117,13 @@ public class TestRatisPipelineProvider {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline =
         provider.create(factor, createListOfNodes(factor.getNumber()));
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.OPEN);
 
     factor = HddsProtos.ReplicationFactor.ONE;
     pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.OPEN);
   }
 
   @Test
@@ -141,7 +147,8 @@ public class TestRatisPipelineProvider {
 
     // only 2 healthy DNs left that are not part of any pipeline
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
 
     List<DatanodeDetails> nodes = pipeline.getNodes();
 
@@ -156,8 +163,9 @@ public class TestRatisPipelineProvider {
 
   private static void assertPipelineProperties(
       Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor,
-      HddsProtos.ReplicationType expectedReplicationType) {
-    assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
+      HddsProtos.ReplicationType expectedReplicationType,
+      Pipeline.PipelineState expectedState) {
+    assertEquals(expectedState, pipeline.getPipelineState());
     assertEquals(expectedReplicationType, pipeline.getType());
     assertEquals(expectedFactor, pipeline.getFactor());
     assertEquals(expectedFactor.getNumber(), pipeline.getNodes().size());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index f6d9b0e..6ea1bfe 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -68,10 +63,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
-
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
 
@@ -113,10 +107,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
 
@@ -188,10 +182,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
 
 
-      // No datanodes have sent pipelinereport from datanode
+      // No pipeline event have sent to SCMSafemodeManager
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
 
@@ -225,12 +220,12 @@ public class TestHealthyPipelineSafeModeRule {
           GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
               SCMSafeModeManager.class));
 
-      // fire event with pipeline report with ratis type and factor 1
+      // fire event with pipeline create status with ratis type and factor 1
       // pipeline, validate() should return false
       firePipelineEvent(pipeline1, eventQueue);
 
       GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-          "reported count is 0"),
+          "reported count is 1"),
           1000, 5000);
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
@@ -246,20 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
 
   }
 
-
   private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
-    PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-        .newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-
-    // Here no need to fire event from 3 nodes, as already pipeline is in
-    // open state, but doing it.
-    eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-            pipeline.getNodes().get(0), reportBuilder.build()));
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 7a09977..0fa5eae 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
     ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().toString());
+    ozoneConfiguration.setBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     List<ContainerInfo> containers = new ArrayList<>();
     containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -71,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule {
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
-            eventQueue, null);
+            eventQueue);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
 
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineCountThree - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
-
-
   private void createPipelines(int count,
       HddsProtos.ReplicationFactor factor) throws Exception {
     for (int i = 0; i < count; i++) {
@@ -184,27 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   }
 
   private void firePipelineEvent(Pipeline pipeline) {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-
-    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(1), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(2), reportBuilder.build()));
-    } else {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-    }
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1e608b3..b5839bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -73,6 +70,8 @@ public class TestSCMSafeModeManager {
   public static void setUp() {
     queue = new EventQueue();
     config = new OzoneConfiguration();
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
   }
 
   @Test
@@ -177,7 +176,7 @@ public class TestSCMSafeModeManager {
         HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
     conf.setDouble(HddsConfigKeys.
         HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -199,7 +198,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -217,7 +216,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -234,7 +233,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -258,7 +257,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue, null);
+        mockNodeManager, queue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
@@ -302,12 +301,12 @@ public class TestSCMSafeModeManager {
     // we shall a get an event when datanode is registered. In that case,
     // validate will return true, and add this to validatedRules.
     if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
-      firePipelineEvent(pipelines.get(0));
+      firePipelineEvent(pipelineManager, pipelines.get(0));
     }
 
     for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
-        oneReplicaThresholdCount); i++) {
-      firePipelineEvent(pipelines.get(i));
+        Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+      firePipelineEvent(pipelineManager, pipelines.get(i));
 
       if (i < healthyPipelineThresholdCount) {
         checkHealthy(i + 1);
@@ -352,16 +351,11 @@ public class TestSCMSafeModeManager {
         1000,  5000);
   }
 
-  private void firePipelineEvent(Pipeline pipeline) throws Exception {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-            reportBuilder.build()));
+  private void firePipelineEvent(SCMPipelineManager pipelineManager,
+      Pipeline pipeline) throws Exception {
+    pipelineManager.openPipeline(pipeline.getId());
+    queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+        pipelineManager.getPipeline(pipeline.getId()));
   }
 
 
@@ -480,7 +474,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue, null);
+          nodeManager, queue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -491,11 +485,6 @@ public class TestSCMSafeModeManager {
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
-      PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-          .newBuilder();
-      reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-          .setPipelineID(pipeline.getId().getProtobuf())
-          .setIsLeader(Boolean.TRUE));
 
       scmSafeModeManager = new SCMSafeModeManager(
           config, containers, pipelineManager, queue);
@@ -504,10 +493,9 @@ public class TestSCMSafeModeManager {
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
       assertTrue(scmSafeModeManager.getInSafeMode());
 
-      // Trigger the processed pipeline report event
-      queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-              reportBuilder.build()));
+
+
+      firePipelineEvent(pipelineManager, pipeline);
 
       GenericTestUtils.waitFor(() -> {
         return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh b/hadoop-ozone/dist/src/main/compose/testlib.sh
index b20dca8..684f2f5 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -80,6 +80,40 @@ wait_for_datanodes(){
    return 1
 }
 
+## @description wait until safemode exit (or 30 seconds)
+## @param the docker-compose file
+wait_for_safemode_exit(){
+  local compose_file=$1
+
+  #Reset the timer
+  SECONDS=0
+
+  #Don't give it up until 30 seconds
+  while [[ $SECONDS -lt 90 ]]; do
+
+     #This line checks the safemode status in scm
+     local command="ozone scmcli safemode status"
+     if [[ "${SECURITY_ENABLED}" == 'true' ]]; then
+         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "kinit -k HTTP/scm@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab && $command'"`
+     else
+         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "$command"`
+     fi
+
+     echo $status
+     if [[ "$status" ]]; then
+       if [[ ${status} == "SCM is out of safe mode." ]]; then
+         #Safemode exits. Let's return from the function.
+         echo "Safe mode is off"
+         return
+       fi
+     fi
+
+     sleep 2
+   done
+   echo "WARNING! Safemode is still on. Please check the docker-compose files"
+   return 1
+}
+
 ## @description  Starts a docker-compose based test environment
 ## @param number of datanodes to start and wait for (default: 3)
 start_docker_env(){
@@ -90,6 +124,7 @@ start_docker_env(){
   docker-compose -f "$COMPOSE_FILE" --no-ansi down
   docker-compose -f "$COMPOSE_FILE" --no-ansi up -d --scale datanode="${datanode_count}" \
     && wait_for_datanodes "$COMPOSE_FILE" "${datanode_count}" \
+    && wait_for_safemode_exit "$COMPOSE_FILE" \
     && sleep 10
 
   if [[ $? -gt 0 ]]; then
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index fe612a0..b4f8c37 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -73,7 +73,7 @@ public class TestContainerStateManagerIntegration {
     numContainerPerOwnerInPipeline =
         conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
             ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     cluster.waitTobeOutOfSafeMode();
     xceiverClientManager = new XceiverClientManager(conf);
@@ -165,7 +165,9 @@ public class TestContainerStateManagerIntegration {
       }
     }
 
-    cluster.restartStorageContainerManager(true);
+    // Restart SCM will not trigger container report to satisfy the safe mode
+    // exit rule.
+    cluster.restartStorageContainerManager(false);
 
     List<ContainerInfo> result = cluster.getStorageContainerManager()
         .getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
index f2c31d1..26c8c01 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 import java.util.HashMap;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.fail;
@@ -56,6 +58,7 @@ public class TestSCMContainerManagerMetrics {
   public void setup() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "3000s");
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..21fa7bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
-    pipelineDestroyTimeoutInMillis = 5000;
+    pipelineDestroyTimeoutInMillis = 1000;
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
         pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep(5000);
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
new file mode 100644
index 0000000..8b8c64f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+  private NodeManager nodeManager;
+  private PipelineProvider provider;
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true, 10);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    stateManager = new PipelineStateManager(conf);
+    provider = new MockRatisPipelineProvider(nodeManager,
+        stateManager, new OzoneConfiguration());
+  }
+
+  private void createPipelineAndAssertions(
+          HddsProtos.ReplicationFactor factor) throws IOException {
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    // New pipeline should not overlap with the previous created pipeline
+    Assert.assertTrue(
+        CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
+            .isEmpty());
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline1.getPipelineState());
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactor() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    // New pipeline should overlap with the previous created pipeline,
+    // and one datanode should overlap between the two types.
+    Assert.assertEquals(
+        CollectionUtils.intersection(pipeline.getNodes(),
+            pipeline1.getNodes()).size(), 1);
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline1.getPipelineState());
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorThree() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorOne() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+  }
+
+  private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < nodeCount; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testCreatePipelineWithNodes() {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline =
+        provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(
+        pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelinesDnExclude() throws IOException {
+
+    // We need 9 Healthy DNs in MockNodeManager.
+    NodeManager mockNodeManager = new MockNodeManager(true, 12);
+    PipelineStateManager stateManagerMock =
+        new PipelineStateManager(new OzoneConfiguration());
+    PipelineProvider providerMock = new MockRatisPipelineProvider(
+        mockNodeManager, stateManagerMock, new OzoneConfiguration());
+
+    // Use up first 3 DNs for an open pipeline.
+    List<DatanodeDetails> openPiplineDns = mockNodeManager.getAllNodes()
+        .subList(0, 3);
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+
+    Pipeline openPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(openPiplineDns)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .build();
+
+    stateManagerMock.addPipeline(openPipeline);
+
+    // Use up next 3 DNs also for an open pipeline.
+    List<DatanodeDetails> moreOpenPiplineDns = mockNodeManager.getAllNodes()
+        .subList(3, 6);
+    Pipeline anotherOpenPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(moreOpenPiplineDns)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .build();
+    stateManagerMock.addPipeline(anotherOpenPipeline);
+
+    // Use up next 3 DNs also for a closed pipeline.
+    List<DatanodeDetails> closedPiplineDns = mockNodeManager.getAllNodes()
+        .subList(6, 9);
+    Pipeline anotherClosedPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(closedPiplineDns)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .build();
+    stateManagerMock.addPipeline(anotherClosedPipeline);
+
+    Pipeline pipeline = providerMock.create(factor);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
+
+    // Pipline nodes cannot be from open pipelines.
+    Assert.assertTrue(
+        pipelineNodes.parallelStream().filter(dn ->
+        (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
+        .count() == 0);
+
+    // Since we have only 9 Healthy DNs, at least 1 pipeline node should have
+    // been from the closed pipeline DN list.
+    Assert.assertTrue(pipelineNodes.parallelStream().filter(
+        closedPiplineDns::contains).count() > 0);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 105d2e2..9e2cfa9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -62,6 +62,7 @@ public class TestSCMPipelineManager {
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
       throw new IOException("Unable to create test directory path");
@@ -77,7 +78,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -94,7 +95,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -117,7 +118,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -135,7 +136,71 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    try {
+      pipelineManager.getPipeline(pipeline.getId());
+      Assert.fail("Pipeline should not have been retrieved");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("not found"));
+    }
+
+    // clean up
+    pipelineManager.close();
+  }
+
+  @Test
+  public void testPipelineReport() throws IOException {
+    EventQueue eventQueue = new EventQueue();
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+    SCMSafeModeManager scmSafeModeManager =
+        new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
+            eventQueue);
+
+    // create a pipeline in allocated state with no dns yet reported
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+
+    Assert
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    Assert
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+    // pipeline is not healthy until all dns report
+    PipelineReportHandler pipelineReportHandler =
+        new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    Assert.assertFalse(
+        pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    // get pipeline report from each dn in the pipeline
+    nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+        pipelineReportHandler, false, eventQueue));
+    sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+        pipelineReportHandler, true, eventQueue);
+
+    // pipeline is healthy when all dns report
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    // pipeline should now move to open state
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+    // close the pipeline
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+
+    // pipeline report for destroyed pipeline should be ignored
+    nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+        pipelineReportHandler, false, eventQueue));
+    sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+        pipelineReportHandler, true, eventQueue);
+
     try {
       pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
@@ -152,7 +217,7 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -161,9 +226,9 @@ public class TestSCMPipelineManager {
 
     MetricsRecordBuilder metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    long numPipelineCreated = getLongCounter("NumPipelineCreated",
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
         metrics);
-    Assert.assertTrue(numPipelineCreated == 0);
+    Assert.assertTrue(numPipelineAllocated == 0);
 
     // 3 DNs are unhealthy.
     // Create 5 pipelines (Use up 15 Datanodes)
@@ -176,8 +241,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     long numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -196,8 +261,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -210,7 +275,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -257,7 +322,7 @@ public class TestSCMPipelineManager {
   public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -270,7 +335,7 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
     // new pipeline manager loads the pipelines from the db in ALLOCATED state
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 59cef37..0102246 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -262,7 +262,7 @@ public interface MiniOzoneCluster {
     // Use relative smaller number of handlers for testing
     protected int numOfOmHandlers = 20;
     protected int numOfScmHandlers = 20;
-    protected int numOfDatanodes = 1;
+    protected int numOfDatanodes = 3;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2813711..2ad6c12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -67,7 +65,6 @@ import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@@ -98,7 +95,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
   private final List<HddsDatanodeService> hddsDatanodes;
 
   // Timeout for the cluster to be ready
-  private int waitForClusterToBeReadyTimeout = 60000; // 1 min
+  private int waitForClusterToBeReadyTimeout = 120000; // 2 min
   private CertificateClient caClient;
 
   /**
@@ -147,32 +144,17 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      boolean isReady = healthy == hddsDatanodes.size();
-      boolean printIsReadyMsg = true;
-      List<Pipeline> pipelines = scm.getPipelineManager().getPipelines();
-      if (!pipelines.isEmpty()) {
-        List<Pipeline> raftPipelines = pipelines.stream().filter(p ->
-            p.getType() == HddsProtos.ReplicationType.RATIS).collect(
-                Collectors.toList());
-        if (!raftPipelines.isEmpty()) {
-          List<Pipeline> notOpenPipelines = raftPipelines.stream().filter(p ->
-              p.getPipelineState() != Pipeline.PipelineState.OPEN &&
-                  p.getPipelineState() != Pipeline.PipelineState.CLOSED)
-              .collect(Collectors.toList());
-          if (notOpenPipelines.size() > 0) {
-            LOG.info("Waiting for {} number of pipelines out of {}, to report "
-                + "a leader.", notOpenPipelines.size(), raftPipelines.size());
-            isReady = false;
-            printIsReadyMsg = false;
-          }
-        }
-      }
-      if (printIsReadyMsg) {
-        LOG.info("{}. Got {} of {} DN Heartbeats.",
-            isReady ? "Cluster is ready" : "Waiting for cluster to be ready",
-            healthy, hddsDatanodes.size());
-      }
-      return isReady;
+      final boolean isNodeReady = healthy == hddsDatanodes.size();
+      final boolean exitSafeMode = !scm.isInSafeMode();
+
+      LOG.info("{}. Got {} of {} DN Heartbeats.",
+          isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+          healthy, hddsDatanodes.size());
+      LOG.info(exitSafeMode? "Cluster exits safe mode" :
+              "Waiting for cluster to exit safe mode",
+          healthy, hddsDatanodes.size());
+
+      return isNodeReady && exitSafeMode;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
@@ -660,7 +642,6 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (hbInterval.isPresent()) {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             hbInterval.get(), TimeUnit.MILLISECONDS);
-
       } else {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             DEFAULT_HB_INTERVAL_MS,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index cd975cf..eadb520 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -49,7 +49,7 @@ public class TestContainerOperations {
     ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     ozoneConf.setStorageSize(OZONE_SCM_CONTAINER_SIZE, 5, StorageUnit.GB);
-    cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
+    cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
     storageClient = new ContainerOperationClient(ozoneConf);
     cluster.waitForClusterToBeReady();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index 76eee6a..548f9b6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -61,7 +61,7 @@ public class TestContainerStateMachineIdempotency {
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     cluster =
-        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index efc2736..99b4083 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -282,11 +282,11 @@ public class TestMiniOzoneCluster {
    * Test that a DN can register with SCM even if it was started before the SCM.
    * @throws Exception
    */
-  @Test (timeout = 300_000)
+  @Test (timeout = 60000)
   public void testDNstartAfterSCM() throws Exception {
-    // Start a cluster with 1 DN
+    // Start a cluster with 3 DN
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 86dd75a..706f880 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.ozone;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
     .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -328,10 +332,12 @@ public class TestStorageContainerManager {
         100, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         numKeys);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
         .setHbProcessorInterval(3000)
+        .setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
 
@@ -458,7 +464,7 @@ public class TestStorageContainerManager {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     //This will set the cluster id in the version file
     MiniOzoneCluster cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     try {
       // This will initialize SCM
@@ -570,6 +576,7 @@ public class TestStorageContainerManager {
         100, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         numKeys);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
@@ -597,7 +604,7 @@ public class TestStorageContainerManager {
 
       scm.getContainerManager().updateContainerState(selectedContainer
           .containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
-      cluster.restartStorageContainerManager(true);
+      cluster.restartStorageContainerManager(false);
       scm = cluster.getStorageContainerManager();
       EventPublisher publisher = mock(EventPublisher.class);
       ReplicationManager replicationManager = scm.getReplicationManager();
@@ -607,8 +614,7 @@ public class TestStorageContainerManager {
       modifiersField.setAccessible(true);
       modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
       f.set(replicationManager, publisher);
-      scm.getReplicationManager().start();
-      Thread.sleep(2000);
+      Thread.sleep(10000);
 
       UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
           .getDatanodeDetails().getUuid();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
index 623b11d..e6ed498 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -81,6 +83,7 @@ public class TestBCSID {
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
             .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 82c4910..e4b34fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,6 +52,8 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_MIN_PIPELINE;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
@@ -93,6 +95,7 @@ public class TestContainerStateMachine {
     OzoneManager.setTestSecureOmFlag(true);
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     //  conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+    conf.setInt(HDDS_SCM_SAFEMODE_MIN_PIPELINE, 0);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
             .setHbInterval(200)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 0fb15d0..e6302e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -118,6 +120,8 @@ public class TestContainerStateMachineFailures {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
@@ -129,7 +133,7 @@ public class TestContainerStateMachineFailures {
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -576,6 +580,7 @@ public class TestContainerStateMachineFailures {
     Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
     Assert
         .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
+
     // write a new key
     key = objectStore.getVolume(volumeName).getBucket(bucketName)
         .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
@@ -599,7 +604,7 @@ public class TestContainerStateMachineFailures {
     byte[] blockCommitSequenceIdKey =
         DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
 
-    // modify the bcsid for the container in the ROCKS DB tereby inducing
+    // modify the bcsid for the container in the ROCKS DB thereby inducing
     // corruption
     db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
     db.decrementReference();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 9768943..8310183 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -73,7 +73,7 @@ public class TestContainerReplication {
   @Before
   public void setup() throws Exception {
     conf = newOzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
         .setRandomContainerPort(true).build();
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index c7b7992..ffe5b6f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -112,7 +112,7 @@ public class TestBlockDeletion {
         3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setHbInterval(200)
         .build();
     cluster.waitForClusterToBeReady();
@@ -143,7 +143,7 @@ public class TestBlockDeletion {
     String keyName = UUID.randomUUID().toString();
 
     OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
-        ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
+        ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
     for (int i = 0; i < 100; i++) {
       out.write(value.getBytes());
     }
@@ -152,7 +152,7 @@ public class TestBlockDeletion {
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
         .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setFactor(HddsProtos.ReplicationFactor.THREE)
         .setRefreshPipeline(true)
         .build();
     List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 5c7f2c1..3320f94 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.util.HashMap;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -58,6 +59,7 @@ public class TestCloseContainerHandler {
     //setup a cluster (1G free space is enough for a unit test)
     conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1).build();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 1cbf69e..4df6f69 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -66,6 +67,7 @@ public class TestDeleteContainerHandler {
   public static void setup() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
index 7fb9825..1d04330 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -85,6 +85,8 @@ public class TestDataScrubber {
     ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 732fb34..e1e0cfa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -67,7 +67,7 @@ public class TestKeyPurging {
     conf.setQuietMode(false);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setHbInterval(200)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
index 3614a05..1c1f034 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
@@ -341,7 +341,7 @@ public class TestScmSafeMode {
     builder = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
         .setHbProcessorInterval(500)
-        .setNumDatanodes(1);
+        .setNumDatanodes(3);
     cluster = builder.build();
     StorageContainerManager scm = cluster.getStorageContainerManager();
     assertFalse(scm.isInSafeMode());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 48ce4a6..9d187ff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -61,7 +61,7 @@ public class TestContainerSmallFile {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
-    cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
+    cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient = cluster
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index b19020f..2f8c755 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -65,7 +65,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     cluster =
-        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index e700a0e..59a28f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -57,7 +57,7 @@ import static org.junit.Assert.fail;
 public class TestSCMMXBean {
 
   public static final Log LOG = LogFactory.getLog(TestSCMMXBean.class);
-  private static int numOfDatanodes = 1;
+  private static int numOfDatanodes = 3;
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf;
   private static StorageContainerManager scm;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index 65a6357..4aa1eae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
@@ -49,7 +50,8 @@ public class TestSCMNodeMetrics {
   @Before
   public void setup() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
   }
 
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
index 1d58465..2b95cbd 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
@@ -52,7 +52,7 @@ public class TestOzoneFsRenameDir {
   public void init() throws Exception {
     conf = new OzoneConfiguration();
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
 
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
index 112674a..b6a8f18 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
@@ -65,7 +65,7 @@ public class TestContainerMapper {
     conf.set(OZONE_OM_DB_DIRS, dbPath);
     conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setScmId(SCM_ID)
         .build();
     cluster.waitForClusterToBeReady();


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message