hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sammic...@apache.org
Subject [hadoop-ozone] 05/12: HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237)
Date Mon, 23 Dec 2019 08:08:48 GMT
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 5525b98214adc3d861b047e4c7986e91c6b09dca
Author: Li Cheng <bloodhell2004@gmail.com>
AuthorDate: Wed Nov 27 20:18:50 2019 +0800

    HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  7 ++++
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  | 31 +++++++++++++++-
 hadoop-hdds/common/src/main/proto/hdds.proto       |  1 +
 .../common/src/main/resources/ozone-default.xml    | 12 ++++++
 .../scm/pipeline/BackgroundPipelineCreator.java    |  8 +++-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  3 ++
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 26 +++++++++++++
 .../scm/pipeline/MockRatisPipelineProvider.java    | 28 ++++++++++++++
 .../scm/pipeline/TestRatisPipelineProvider.java    | 10 ++++-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 43 +++++++++++++++++++++-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  2 +-
 11 files changed, 166 insertions(+), 5 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 7e8c435..845d86f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -338,6 +338,13 @@ public final class ScmConfigKeys {
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
       "ozone.scm.keyvalue.container.deletion-choosing.policy";
 
+  // Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
+  public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
+      "ozone.scm.pipeline.allocated.timeout";
+
+  public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
+      "5m";
+
   public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT =
       "ozone.scm.container.creation.lease.timeout";
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 594fcf7..295156d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -56,6 +56,8 @@ public final class Pipeline {
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
   // Current reported Leader for the pipeline
   private UUID leaderId;
+  // Timestamp for pipeline upon creation
+  private Long creationTimestamp;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -70,6 +72,7 @@ public final class Pipeline {
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
+    this.creationTimestamp = System.currentTimeMillis();
   }
 
   /**
@@ -109,6 +112,24 @@ public final class Pipeline {
   }
 
   /**
+   * Return the creation time of pipeline.
+   *
+   * @return Creation Timestamp
+   */
+  public Long getCreationTimestamp() {
+    return creationTimestamp;
+  }
+
+  /**
+   * Set the creation timestamp. Only for protobuf now.
+   *
+   * @param creationTimestamp
+   */
+  void setCreationTimestamp(Long creationTimestamp) {
+    this.creationTimestamp = creationTimestamp;
+  }
+
+  /**
    * Return the pipeline leader's UUID.
    *
    * @return DatanodeDetails.UUID.
@@ -221,6 +242,7 @@ public final class Pipeline {
         .setFactor(factor)
         .setState(PipelineState.getProtobuf(state))
         .setLeaderID(leaderId != null ? leaderId.toString() : "")
+        .setCreationTimeStamp(creationTimestamp)
         .addAllMembers(nodeStatus.keySet().stream()
             .map(DatanodeDetails::getProtoBufMessage)
             .collect(Collectors.toList()));
@@ -299,7 +321,8 @@ public final class Pipeline {
     b.append(", Factor:").append(getFactor());
     b.append(", State:").append(getPipelineState());
     b.append(", leaderId:").append(getLeaderId());
-    b.append(" ]");
+    b.append(", CreationTimestamp").append(getCreationTimestamp());
+    b.append("]");
     return b.toString();
   }
 
@@ -323,6 +346,7 @@ public final class Pipeline {
     private List<Integer> nodeOrder = null;
     private List<DatanodeDetails> nodesInOrder = null;
     private UUID leaderId = null;
+    private Long creationTimestamp = null;
 
     public Builder() {}
 
@@ -334,6 +358,7 @@ public final class Pipeline {
       this.nodeStatus = pipeline.nodeStatus;
       this.nodesInOrder = pipeline.nodesInOrder.get();
       this.leaderId = pipeline.getLeaderId();
+      this.creationTimestamp = pipeline.getCreationTimestamp();
     }
 
     public Builder setId(PipelineID id1) {
@@ -380,6 +405,10 @@ public final class Pipeline {
       Preconditions.checkNotNull(nodeStatus);
       Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
       pipeline.setLeaderId(leaderId);
+      // overwrite with original creationTimestamp
+      if (creationTimestamp != null) {
+        pipeline.setCreationTimestamp(creationTimestamp);
+      }
 
       if (nodeOrder != null && !nodeOrder.isEmpty()) {
         // This branch is for build from ProtoBuf
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 39a01dc..b313604 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -75,6 +75,7 @@ message Pipeline {
     required PipelineID id = 5;
     optional string leaderID = 6;
     repeated uint32 memberOrders = 7;
+    optional uint64 creationTimeStamp = 8;
 }
 
 message KeyValue {
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3093292..653d530 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -856,6 +856,18 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.pipeline.allocated.timeout</name>
+    <value>5m</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created,
+      it should be at OPEN stage once pipeline report is successfully received by SCM.
+      If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new
+      pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED
+      stage until it gets scrubbed.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.container.size</name>
     <value>5GB</value>
     <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index b663f2a..8e4ec6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -110,12 +110,18 @@ class BackgroundPipelineCreator {
         // Skip this iteration for creating pipeline
         continue;
       }
+
+      try {
+        pipelineManager.scrubPipeline(type, factor);
+      } catch (IOException e) {
+        LOG.error("Error while scrubbing pipelines {}", e);
+      }
+
       while (true) {
         try {
           if (scheduler.isClosed()) {
             break;
           }
-
           pipelineManager.createPipeline(type, factor);
         } catch (IOException ioe) {
           break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 779008f..44432d9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -75,6 +75,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean
{
   void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
       throws IOException;
 
+  void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+      throws IOException;
+
   void startPipelineCreator();
 
   void triggerPipelineCreation();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 1b2c476..4346dea 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -54,6 +54,7 @@ import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
 
@@ -341,6 +342,31 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
+  public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+      throws IOException{
+    if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
+      // Only srub pipeline for RATIS THREE pipeline
+      return;
+    }
+    Long currentTime = System.currentTimeMillis();
+    Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
+        Pipeline.PipelineState.ALLOCATED).stream()
+        .filter(p -> (currentTime - p.getCreationTimestamp()
+            >= pipelineScrubTimeoutInMills))
+        .collect(Collectors.toList());
+    for (Pipeline p : needToSrubPipelines) {
+      LOG.info("srubbing pipeline: id: " + p.getId().toString() +
+          " since it stays at ALLOCATED stage for " +
+          (currentTime - p.getCreationTimestamp())/60000 + " mins.");
+      finalizeAndDestroyPipeline(p, false);
+    }
+  }
+
+  @Override
   public Map<String, Integer> getPipelineInfo() {
     final Map<String, Integer> pipelineInfo = new HashMap<>();
     for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 25b0adc..7513cad 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -33,6 +33,15 @@ import java.util.List;
  */
 public class MockRatisPipelineProvider extends RatisPipelineProvider {
 
+  private boolean autoOpenPipeline;
+
+  public MockRatisPipelineProvider(NodeManager nodeManager,
+                                   PipelineStateManager stateManager,
+                                   Configuration conf, boolean autoOpen) {
+    super(nodeManager, stateManager, conf, null);
+    autoOpenPipeline = autoOpen;
+  }
+
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
@@ -43,6 +52,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
       PipelineStateManager stateManager, Configuration conf,
       EventPublisher eventPublisher) {
     super(nodeManager, stateManager, conf, eventPublisher);
+    autoOpenPipeline = true;
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
@@ -50,6 +60,24 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   }
 
   @Override
+  public Pipeline create(HddsProtos.ReplicationFactor factor)
+      throws IOException {
+    if (autoOpenPipeline) {
+      return super.create(factor);
+    } else {
+      Pipeline initialPipeline = super.create(factor);
+      return Pipeline.newBuilder()
+          .setId(initialPipeline.getId())
+          // overwrite pipeline state to main ALLOCATED
+          .setState(Pipeline.PipelineState.ALLOCATED)
+          .setType(initialPipeline.getType())
+          .setFactor(factor)
+          .setNodes(initialPipeline.getNodes())
+          .build();
+    }
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index f5e3f84..66991e4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.junit.Assume;
@@ -53,9 +54,11 @@ public class TestRatisPipelineProvider {
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     stateManager = new PipelineStateManager();
     provider = new MockRatisPipelineProvider(nodeManager,
-        stateManager, new OzoneConfiguration());
+        stateManager, conf);
   }
 
   private void createPipelineAndAssertions(
@@ -64,6 +67,7 @@ public class TestRatisPipelineProvider {
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
 
     Pipeline pipeline1 = provider.create(factor);
     assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
@@ -149,6 +153,9 @@ public class TestRatisPipelineProvider {
     Pipeline pipeline = provider.create(factor);
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
+    nodeManager.addPipeline(pipeline);
+    stateManager.addPipeline(pipeline);
+
 
     List<DatanodeDetails> nodes = pipeline.getNodes();
 
@@ -184,5 +191,6 @@ public class TestRatisPipelineProvider {
         .build();
 
     stateManager.addPipeline(openPipeline);
+    nodeManager.addPipeline(openPipeline);
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 08f5185..2df851d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
@@ -28,6 +29,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
@@ -270,7 +272,7 @@ public class TestSCMPipelineManager {
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
     Assert.assertTrue(numPipelineCreateFailed == 1);
-    
+
     // clean up
     pipelineManager.close();
   }
@@ -374,6 +376,45 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
   }
 
+  @Test
+  public void testScrubPipeline() throws IOException {
+    // No timeout for pipeline scrubber.
+    conf.setTimeDuration(
+        OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
+        TimeUnit.MILLISECONDS);
+
+    final SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+    final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
+        nodeManager, pipelineManager.getStateManager(), conf, false);
+
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        ratisProvider);
+
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+    // At this point, pipeline is not at OPEN stage.
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
+
+    // pipeline should be seen in pipelineManager as ALLOCATED.
+    Assert.assertTrue(pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+    pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE);
+
+    // pipeline should be scrubbed.
+    Assert.assertFalse(pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+
+    pipelineManager.close();
+  }
+
   private void sendPipelineReport(DatanodeDetails dn,
       Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
       boolean isLeader, EventQueue eventQueue) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index ce34ba6..418e86d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -142,7 +142,7 @@ public class TestRatisPipelineProvider {
     List<DatanodeDetails> allHealthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
     int totalHealthyNodesCount = allHealthyNodes.size();
-    
+
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
 
     List<DatanodeDetails> closePipelineDns = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message