hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject [hadoop] branch ozone-0.4 updated: HDDS-1128. Create stateful manager class for the pipeline creation scheduling.
Date Wed, 13 Mar 2019 11:17:47 GMT
This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new b5b3f4d  HDDS-1128. Create stateful manager class for the pipeline creation scheduling.
b5b3f4d is described below

commit b5b3f4d8230fd3cdcc6d2629e8f2450564f5a512
Author: Lokesh Jain <ljain@apache.org>
AuthorDate: Wed Mar 13 16:26:06 2019 +0530

    HDDS-1128. Create stateful manager class for the pipeline creation scheduling.
    
    Signed-off-by: Nanda kumar <nanda@apache.org>
    (cherry picked from commit 0d62753da9650c2d3470ff4572734241ecaa6a71)
---
 .../java/org/apache/hadoop/utils/Scheduler.java    |  17 +--
 .../hdds/scm/chillmode/SCMChillModeManager.java    |   4 +-
 .../hadoop/hdds/scm/node/NewNodeHandler.java       |   4 +-
 .../scm/node/NonHealthyToHealthyNodeHandler.java   |   3 +-
 .../hadoop/hdds/scm/node/StaleNodeHandler.java     |   4 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    | 110 +++++++++++++++++
 .../hdds/scm/pipeline/PipelineActionHandler.java   |   4 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |   6 -
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   9 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   1 -
 .../hdds/scm/pipeline/PipelineReportHandler.java   |   9 --
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  17 ---
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 123 +------------------
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 136 +++++++++++++++++----
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 -
 .../hdds/scm/server/SCMClientProtocolServer.java   |   4 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   4 +-
 .../scm/chillmode/TestSCMChillModeManager.java     |  14 +--
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   4 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  74 +++--------
 .../hdds/scm/pipeline/TestRatisPipelineUtils.java  |  14 ++-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  13 +-
 .../ozone/freon/TestFreonWithPipelineDestroy.java  |   4 +-
 23 files changed, 290 insertions(+), 293 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java
index 1171dbf..71eaf33 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java
@@ -87,14 +87,17 @@ public class Scheduler {
    * yet executed are also cancelled. For the executing tasks the scheduler
    * waits 60 seconds for completion.
    */
-  public void close() {
+  public synchronized void close() {
     isClosed = true;
-    scheduler.shutdownNow();
-    try {
-      scheduler.awaitTermination(60, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOG.info(threadName + " interrupted while waiting for task completion {}",
-          e);
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      try {
+        scheduler.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.info(
+            threadName + " interrupted while waiting for task completion {}",
+            e);
+      }
     }
     scheduler = null;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
index 8ced6d5..79191e1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.slf4j.Logger;
@@ -145,8 +144,7 @@ public class SCMChillModeManager {
     emitChillModeStatus();
     // TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
     // creation job needs to stop
-    RatisPipelineUtils
-        .scheduleFixedIntervalPipelineCreator(pipelineManager, config);
+    pipelineManager.startPipelineCreator();
   }
 
   public boolean getInChillMode() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index 38b3fe0..1dc924b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -41,7 +40,6 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    RatisPipelineUtils
-        .triggerPipelineCreation(pipelineManager, conf, 0);
+    pipelineManager.triggerPipelineCreation();
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
index 2c9b685..5976c17 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -43,6 +42,6 @@ public class NonHealthyToHealthyNodeHandler
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    RatisPipelineUtils.triggerPipelineCreation(pipelineManager, conf, 0);
+    pipelineManager.triggerPipelineCreation();
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index 93630f0..26e8f5f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
@@ -61,8 +60,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
     for (PipelineID pipelineID : pipelineIds) {
       try {
         Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
-        RatisPipelineUtils
-            .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, true);
+        pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
       } catch (IOException e) {
         LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
             datanodeDetails);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
new file mode 100644
index 0000000..26e11b8
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.utils.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implements api for running background pipeline creation jobs.
+ */
+class BackgroundPipelineCreator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundPipelineCreator.class);
+
+  private final Scheduler scheduler;
+  private final AtomicBoolean isPipelineCreatorRunning;
+  private final PipelineManager pipelineManager;
+  private final Configuration conf;
+
+  BackgroundPipelineCreator(PipelineManager pipelineManager,
+      Scheduler scheduler, Configuration conf) {
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+    this.scheduler = scheduler;
+    isPipelineCreatorRunning = new AtomicBoolean(false);
+  }
+
+  private boolean shouldSchedulePipelineCreator() {
+    return isPipelineCreatorRunning.compareAndSet(false, true);
+  }
+
+  /**
+   * Schedules a fixed interval job to create pipelines.
+   */
+  void startFixedIntervalPipelineCreator() {
+    long intervalInMillis = conf
+        .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+            ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    // TODO: #CLUTIL We can start the job asap
+    scheduler.scheduleWithFixedDelay(() -> {
+      if (!shouldSchedulePipelineCreator()) {
+        return;
+      }
+      createPipelines();
+    }, 0, intervalInMillis, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Triggers pipeline creation via background thread.
+   */
+  void triggerPipelineCreation() {
+    // TODO: #CLUTIL introduce a better mechanism to not have more than one
+    // job of a particular type running, probably via ratis.
+    if (!shouldSchedulePipelineCreator()) {
+      return;
+    }
+    scheduler.schedule(this::createPipelines, 0, TimeUnit.MILLISECONDS);
+  }
+
+  private void createPipelines() {
+    // TODO: #CLUTIL Different replication factor may need to be supported
+    HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
+        conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+            OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+
+    for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+        .values()) {
+      while (true) {
+        try {
+          if (scheduler.isClosed()) {
+            break;
+          }
+          pipelineManager.createPipeline(type, factor);
+        } catch (IOException ioe) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Error while creating pipelines {}", t);
+          break;
+        }
+      }
+    }
+    isPipelineCreatorRunning.set(false);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 94f757b..da704d2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -59,9 +59,7 @@ public class PipelineActionHandler
           Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
           LOG.info("Received pipeline action {} for {} from datanode [}",
               action.getAction(), pipeline, report.getDatanodeDetails());
-          RatisPipelineUtils
-              .finalizeAndDestroyPipeline(pipelineManager, pipeline, ozoneConf,
-                  true);
+          pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
         } catch (IOException ioe) {
           LOG.error("Could not execute pipeline action={} pipeline={} {}",
               action, pipelineID, ioe);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 9a846ad..8934976 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -61,10 +61,4 @@ public final class PipelineFactory {
       List<DatanodeDetails> nodes) {
     return providers.get(type).create(factor, nodes);
   }
-
-  public void close() {
-    for (PipelineProvider p : providers.values()) {
-      p.close();
-    }
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 11ba2c3..2793647 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -67,9 +67,12 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
 
   int getNumberOfContainers(PipelineID pipelineID) throws IOException;
 
-  void finalizePipeline(PipelineID pipelineID) throws IOException;
-
   void openPipeline(PipelineID pipelineId) throws IOException;
 
-  void removePipeline(PipelineID pipelineID) throws IOException;
+  void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException;
+
+  void startPipelineCreator();
+
+  void triggerPipelineCreation();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 610e78a..bb16533 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,4 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
-  void close();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 9c914b0..330ad8b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -108,15 +108,6 @@ public class PipelineReportHandler implements
         // if all the dns have reported, pipeline can be moved to OPEN state
         pipelineManager.openPipeline(pipelineID);
       }
-    } else if (pipeline.isClosed()) {
-      int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
-      if (numContainers == 0) {
-        // since all the containers have been closed the pipeline can be
-        // destroyed
-        LOGGER.info("Destroying pipeline {} as all containers are closed",
-            pipeline);
-        RatisPipelineUtils.destroyPipeline(pipelineManager, pipeline, conf);
-      }
     } else {
       // In OPEN state case just report the datanode
       pipeline.reportDatanode(dn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index b73f63d..6952200 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.utils.Scheduler;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -38,8 +37,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
 /**
  * Implements Api for creating ratis pipelines.
  */
@@ -48,20 +45,12 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
-  private static Scheduler scheduler;
 
-  //TODO static Scheduler should be removed!!!! HDDS-1128
-  @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
-  }
-
-  static Scheduler getScheduler() {
-    return scheduler;
   }
 
   /**
@@ -98,7 +87,6 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
     // Get set of datanodes already used for ratis pipeline
@@ -146,9 +134,4 @@ public class RatisPipelineProvider implements PipelineProvider {
   protected void initializePipeline(Pipeline pipeline) throws IOException {
     RatisPipelineUtils.createPipeline(pipeline, conf);
   }
-
-  @Override
-  public void close() {
-    scheduler.close();
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 89dfc0e..3b36add 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.grpc.GrpcTlsConfig;
@@ -42,17 +40,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Utility class for Ratis pipelines. Contains methods to create and destroy
  * ratis pipelines.
  */
-public final class RatisPipelineUtils {
-
-  private static AtomicBoolean isPipelineCreatorRunning =
-      new AtomicBoolean(false);
+final class RatisPipelineUtils {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RatisPipelineUtils.class);
@@ -87,13 +80,11 @@ public final class RatisPipelineUtils {
    * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
    * the datanodes.
    *
-   * @param pipelineManager - SCM pipeline manager
    * @param pipeline        - Pipeline to be destroyed
    * @param ozoneConf       - Ozone configuration
    * @throws IOException
    */
-  public static void destroyPipeline(PipelineManager pipelineManager,
-      Pipeline pipeline, Configuration ozoneConf) throws IOException {
+  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf) {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
     LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
     for (DatanodeDetails dn : pipeline.getNodes()) {
@@ -104,42 +95,6 @@ public final class RatisPipelineUtils {
             pipeline.getId(), dn);
       }
     }
-    // remove the pipeline from the pipeline manager
-    pipelineManager.removePipeline(pipeline.getId());
-    triggerPipelineCreation(pipelineManager, ozoneConf, 0);
-  }
-
-  /**
-   * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to
-   * destroy pipeline on the datanodes immediately or after timeout based on the
-   * value of onTimeout parameter.
-   *
-   * @param pipelineManager - SCM pipeline manager
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone Configuration
-   * @param onTimeout       - if true pipeline is removed and destroyed on
-   *                        datanodes after timeout
-   * @throws IOException
-   */
-  public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager,
-      Pipeline pipeline, Configuration ozoneConf, boolean onTimeout)
-      throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.info("destroying pipeline:{} with {}", pipeline.getId(), group);
-    pipelineManager.finalizePipeline(pipeline.getId());
-    if (onTimeout) {
-      long pipelineDestroyTimeoutInMillis = ozoneConf
-          .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
-              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
-              TimeUnit.MILLISECONDS);
-      RatisPipelineProvider.getScheduler()
-          .schedule(() -> destroyPipeline(pipelineManager, pipeline, ozoneConf),
-              pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String
-                  .format("Destroy pipeline failed for pipeline:%s with %s",
-                      pipeline.getId(), group));
-    } else {
-      destroyPipeline(pipelineManager, pipeline, ozoneConf);
-    }
   }
 
   /**
@@ -194,80 +149,14 @@ public final class RatisPipelineUtils {
               retryPolicy, maxOutstandingRequests, tlsConfig)) {
         rpc.accept(client, p);
       } catch (IOException ioe) {
-        exceptions.add(
-            new IOException("Failed invoke Ratis rpc " + rpc + " for " +
-                d.getUuid(), ioe));
+        String errMsg =
+            "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
+        LOG.error(errMsg, ioe);
+        exceptions.add(new IOException(errMsg, ioe));
       }
     });
     if (!exceptions.isEmpty()) {
       throw MultipleIOException.createIOException(exceptions);
     }
   }
-
-  /**
-   * Schedules a fixed interval job to create pipelines.
-   *
-   * @param pipelineManager - Pipeline manager
-   * @param conf            - Configuration
-   */
-  public static void scheduleFixedIntervalPipelineCreator(
-      PipelineManager pipelineManager, Configuration conf) {
-    long intervalInMillis = conf
-        .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
-            ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    // TODO: #CLUTIL We can start the job asap
-    RatisPipelineProvider.getScheduler().scheduleWithFixedDelay(() -> {
-      if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
-        return;
-      }
-      createPipelines(pipelineManager, conf);
-    }, intervalInMillis, intervalInMillis, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Triggers pipeline creation after the specified time.
-   *
-   * @param pipelineManager - Pipeline manager
-   * @param conf            - Configuration
-   * @param afterMillis     - Time after which pipeline creation needs to be
-   *                        triggered
-   */
-  public static void triggerPipelineCreation(PipelineManager pipelineManager,
-      Configuration conf, long afterMillis) {
-    // TODO: #CLUTIL introduce a better mechanism to not have more than one
-    // job of a particular type running, probably via ratis.
-    if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
-      return;
-    }
-    RatisPipelineProvider.getScheduler()
-        .schedule(() -> createPipelines(pipelineManager, conf), afterMillis,
-            TimeUnit.MILLISECONDS);
-  }
-
-  private static void createPipelines(PipelineManager pipelineManager,
-      Configuration conf) {
-    // TODO: #CLUTIL Different replication factor may need to be supported
-    HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
-        conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
-            OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
-
-    for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
-        .values()) {
-      while (true) {
-        try {
-          if (RatisPipelineProvider.getScheduler().isClosed()) {
-            break;
-          }
-          pipelineManager.createPipeline(type, factor);
-        } catch (IOException ioe) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error while creating pipelines {}", t);
-          break;
-        }
-      }
-    }
-    isPipelineCreatorRunning.set(false);
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 1bb0099..f274829 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.utils.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +48,7 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -68,19 +71,27 @@ public class SCMPipelineManager implements PipelineManager {
   private final ReadWriteLock lock;
   private final PipelineFactory pipelineFactory;
   private final PipelineStateManager stateManager;
-  private final MetadataStore pipelineStore;
+  private final BackgroundPipelineCreator backgroundPipelineCreator;
+  private Scheduler scheduler;
+  private MetadataStore pipelineStore;
 
   private final EventPublisher eventPublisher;
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
+  private final Configuration conf;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
       EventPublisher eventPublisher) throws IOException {
     this.lock = new ReentrantReadWriteLock();
+    this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
+    // TODO: See if thread priority needs to be set for these threads
+    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
+    this.backgroundPipelineCreator =
+        new BackgroundPipelineCreator(this, scheduler, conf);
     int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     final File metaDir = ServerUtils.getScmDbDir(conf);
@@ -268,35 +279,116 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public void finalizePipeline(PipelineID pipelineId) throws IOException {
+  public void openPipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      stateManager.finalizePipeline(pipelineId);
-      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-      for (ContainerID containerID : containerIDs) {
-        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-      }
+      stateManager.openPipeline(pipelineId);
     } finally {
       lock.writeLock().unlock();
     }
   }
 
+  /**
+   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
+   * destroy pipeline on the datanodes immediately or after timeout based on the
+   * value of onTimeout parameter.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @param onTimeout       - if true pipeline is removed and destroyed on
+   *                        datanodes after timeout
+   * @throws IOException
+   */
   @Override
-  public void openPipeline(PipelineID pipelineId) throws IOException {
+  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException {
+    LOG.info("destroying pipeline:{}", pipeline);
+    finalizePipeline(pipeline.getId());
+    if (onTimeout) {
+      long pipelineDestroyTimeoutInMillis =
+          conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+              TimeUnit.MILLISECONDS);
+      scheduler.schedule(() -> destroyPipeline(pipeline),
+          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
+          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
+    } else {
+      destroyPipeline(pipeline);
+    }
+  }
+
+  @Override
+  public Map<String, Integer> getPipelineInfo() {
+    final Map<String, Integer> pipelineInfo = new HashMap<>();
+    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+      pipelineInfo.put(state.toString(), 0);
+    }
+    stateManager.getPipelines().forEach(pipeline ->
+        pipelineInfo.computeIfPresent(
+            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
+    return pipelineInfo;
+  }
+
+  /**
+   * Schedules a fixed interval job to create pipelines.
+   */
+  @Override
+  public void startPipelineCreator() {
+    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+  }
+
+  /**
+   * Triggers pipeline creation after the specified time.
+   */
+  @Override
+  public void triggerPipelineCreation() {
+    backgroundPipelineCreator.triggerPipelineCreation();
+  }
+
+  /**
+   * Moves the pipeline to CLOSED state and sends close container command for
+   * all the containers in the pipeline.
+   *
+   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+   * @throws IOException
+   */
+  private void finalizePipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      stateManager.openPipeline(pipelineId);
+      stateManager.finalizePipeline(pipelineId);
+      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+      for (ContainerID containerID : containerIDs) {
+        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+      }
     } finally {
       lock.writeLock().unlock();
     }
   }
 
-  @Override
-  public void removePipeline(PipelineID pipelineID) throws IOException {
+  /**
+   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+   * the datanodes for ratis pipelines.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  private void destroyPipeline(Pipeline pipeline) throws IOException {
+    RatisPipelineUtils.destroyPipeline(pipeline, conf);
+    // remove the pipeline from the pipeline manager
+    removePipeline(pipeline.getId());
+    triggerPipelineCreation();
+  }
+
+  /**
+   * Removes the pipeline from the db and pipeline state map.
+   *
+   * @param pipelineId - ID of the pipeline to be removed
+   * @throws IOException
+   */
+  private void removePipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
-      Pipeline pipeline = stateManager.removePipeline(pipelineID);
+      pipelineStore.delete(pipelineId.getProtobuf().toByteArray());
+      Pipeline pipeline = stateManager.removePipeline(pipelineId);
       nodeManager.removePipeline(pipeline);
       metrics.incNumPipelineDestroyed();
     } catch (IOException ex) {
@@ -308,25 +400,15 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public Map<String, Integer> getPipelineInfo() {
-    final Map<String, Integer> pipelineInfo = new HashMap<>();
-    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
-      pipelineInfo.put(state.toString(), 0);
-    }
-    stateManager.getPipelines().forEach(pipeline ->
-        pipelineInfo.computeIfPresent(
-            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
-    return pipelineInfo;
-  }
-
-  @Override
   public void close() throws IOException {
-    if (pipelineFactory != null) {
-      pipelineFactory.close();
+    if (scheduler != null) {
+      scheduler.close();
+      scheduler = null;
     }
 
     if (pipelineStore != null) {
       pipelineStore.close();
+      pipelineStore = null;
     }
     if(pmInfoBean != null) {
       MBeans.unregister(this.pmInfoBean);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index b92f17e..3e42df3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -72,9 +72,4 @@ public class SimplePipelineProvider implements PipelineProvider {
         .setNodes(nodes)
         .build();
   }
-
-  @Override
-  public void close() {
-    // Nothing to do in here.
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2b1022b..1024fa3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -413,8 +412,7 @@ public class SCMClientProtocolServer implements
     PipelineManager pipelineManager = scm.getPipelineManager();
     Pipeline pipeline =
         pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
-    RatisPipelineUtils
-        .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     AUDIT.logWriteSuccess(
         buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
     );
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 856e06b..3b9404d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -271,8 +270,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
         .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
 
     for (Pipeline pipeline : pipelineManager.getPipelines()) {
-      RatisPipelineUtils
-          .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
     Assert.assertNotNull(blockManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
index 55dca16..01b78f2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
@@ -35,19 +35,16 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.*;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.mockito.Mockito;
 
 /** Test class for SCMChillModeManager.
  */
@@ -128,12 +125,13 @@ public class TestSCMChillModeManager {
   }
 
   @Test
-  @Ignore("TODO:HDDS-1140")
   public void testDisableChillMode() {
     OzoneConfiguration conf = new OzoneConfiguration(config);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, false);
-    scmChillModeManager = new SCMChillModeManager(
-        conf, containers, null, queue);
+    PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    Mockito.doNothing().when(pipelineManager).startPipelineCreator();
+    scmChillModeManager =
+        new SCMChillModeManager(conf, containers, pipelineManager, queue);
     assertFalse(scmChillModeManager.getInChillMode());
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index db70a7a..c7470a3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -112,8 +112,8 @@ public class TestNode2PipelineMap {
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, set2.size());
 
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    pipelineManager.removePipeline(ratisContainer.getPipeline().getId());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
     pipelines = scm.getScmNodeManager()
         .getPipelines(dns.get(0));
     Assert
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index e855d2c..eb4dba5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -24,24 +24,23 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -121,12 +120,8 @@ public class TestPipelineClose {
         .getContainersInPipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, setClosed.size());
 
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    Pipeline pipeline1 = pipelineManager
-        .getPipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipeline1.getPipelineState());
-    pipelineManager.removePipeline(pipeline1.getId());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
     for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
@@ -135,21 +130,23 @@ public class TestPipelineClose {
   }
 
   @Test
-  public void testPipelineCloseWithOpenContainer() throws IOException,
-      TimeoutException, InterruptedException {
+  public void testPipelineCloseWithOpenContainer()
+      throws IOException, TimeoutException, InterruptedException {
     Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline(
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(1, setOpen.size());
 
-    ContainerID cId2 = ratisContainer.getContainerInfo().containerID();
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipelineManager.getPipeline(
-            ratisContainer.getPipeline().getId()).getPipelineState());
-    Pipeline pipeline2 = pipelineManager
-        .getPipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipeline2.getPipelineState());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return containerManager
+            .getContainer(ratisContainer.getContainerInfo().containerID())
+            .getState() == HddsProtos.LifeCycleState.CLOSING;
+      } catch (ContainerNotFoundException e) {
+        return false;
+      }
+    }, 100, 10000);
   }
 
   @Test
@@ -183,39 +180,4 @@ public class TestPipelineClose {
     } catch (PipelineNotFoundException e) {
     }
   }
-
-  @Test
-  public void testPipelineCloseWithPipelineReport() throws IOException {
-    Pipeline pipeline = ratisContainer.getPipeline();
-    pipelineManager.finalizePipeline(pipeline.getId());
-    // remove pipeline from SCM
-    pipelineManager.removePipeline(pipeline.getId());
-
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      PipelineReportFromDatanode pipelineReport =
-          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
-      EventQueue eventQueue = new EventQueue();
-      SCMChillModeManager scmChillModeManager =
-          new SCMChillModeManager(new OzoneConfiguration(),
-              new ArrayList<>(), pipelineManager, eventQueue);
-      PipelineReportHandler pipelineReportHandler =
-          new PipelineReportHandler(scmChillModeManager, pipelineManager, conf);
-      // on receiving pipeline report for the pipeline, pipeline report handler
-      // should destroy the pipeline for the dn
-      pipelineReportHandler.onMessage(pipelineReport, eventQueue);
-    }
-
-    OzoneContainer ozoneContainer =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer();
-    List<PipelineReport> pipelineReports =
-        ozoneContainer.getPipelineReport().getPipelineReportList();
-    for (PipelineReport pipelineReport : pipelineReports) {
-      // pipeline should not be reported by any dn
-      Assert.assertNotEquals(
-          PipelineID.getFromProtobuf(pipelineReport.getPipelineID()),
-          ratisContainer.getPipeline().getId());
-    }
-  }
-
-}
\ No newline at end of file
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java
index 2180834..b653e7a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -46,6 +47,8 @@ public class TestRatisPipelineUtils {
   private static PipelineManager pipelineManager;
 
   public void init(int numDatanodes) throws Exception {
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        GenericTestUtils.getRandomizedTempPath());
     cluster = MiniOzoneCluster.newBuilder(conf)
             .setNumDatanodes(numDatanodes)
             .setHbInterval(1000)
@@ -71,8 +74,7 @@ public class TestRatisPipelineUtils {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
     for (Pipeline pipeline : pipelines) {
-      RatisPipelineUtils
-          .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     // make sure two pipelines are created
     waitForPipelines(2);
@@ -108,7 +110,13 @@ public class TestRatisPipelineUtils {
     for (HddsDatanodeService dn : dns) {
       cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
     }
+
+    // destroy the existing pipelines
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    }
     // make sure pipelines is created after node start
+    pipelineManager.triggerPipelineCreation();
     waitForPipelines(1);
   }
 
@@ -117,6 +125,6 @@ public class TestRatisPipelineUtils {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() == numPipelines, 100, 20000);
+        .size() == numPipelines, 100, 40000);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 4d8e3af..990d73a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -104,8 +104,7 @@ public class TestSCMPipelineManager {
 
     // clean up
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizePipeline(pipeline.getId());
-      pipelineManager.removePipeline(pipeline.getId());
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     pipelineManager.close();
   }
@@ -126,10 +125,7 @@ public class TestSCMPipelineManager {
     pipelineManager.openPipeline(pipeline.getId());
     pipelineManager
         .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
-    pipelineManager.finalizePipeline(pipeline.getId());
-    pipelineManager
-        .removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(1));
-    pipelineManager.removePipeline(pipeline.getId());
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     pipelineManager.close();
 
     // new pipeline manager should not be able to load removed pipelines
@@ -192,13 +188,12 @@ public class TestSCMPipelineManager {
         .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // close the pipeline
-    pipelineManager.finalizePipeline(pipeline.getId());
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
 
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
           TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
-      // pipeline report for a closed pipeline should destroy the pipeline
-      // and remove it from the pipeline manager
+      // pipeline report for destroyed pipeline should be ignored
       pipelineReportHandler
           .onMessage(pipelineReportFromDatanode, new EventQueue());
     }
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 280cf90..13ecab6 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.junit.AfterClass;
@@ -103,7 +102,6 @@ public class TestFreonWithPipelineDestroy {
     PipelineManager pipelineManager =
         cluster.getStorageContainerManager().getPipelineManager();
     Pipeline pipeline = pipelineManager.getPipeline(id);
-    RatisPipelineUtils
-        .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message