hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject [hadoop] branch trunk updated: HDDS-1810. SCM command to Activate and Deactivate pipelines. (#1224)
Date Tue, 03 Sep 2019 11:21:45 GMT
This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b9704f  HDDS-1810. SCM command to Activate and Deactivate pipelines. (#1224)
0b9704f is described below

commit 0b9704f6106587d9df06c8b3860a23afbccccd43
Author: Nanda kumar <nanda@apache.org>
AuthorDate: Tue Sep 3 16:50:57 2019 +0530

    HDDS-1810. SCM command to Activate and Deactivate pipelines. (#1224)
---
 .../hdds/scm/client/ContainerOperationClient.java  | 12 +++++
 .../apache/hadoop/hdds/scm/client/ScmClient.java   | 16 ++++++
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  4 +-
 .../protocol/StorageContainerLocationProtocol.java | 16 ++++++
 ...inerLocationProtocolClientSideTranslatorPB.java | 32 ++++++++++++
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |  2 +
 ...inerLocationProtocolServerSideTranslatorPB.java | 34 +++++++++++++
 .../proto/StorageContainerLocationProtocol.proto   | 22 ++++++++
 hadoop-hdds/common/src/main/proto/hdds.proto       |  3 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  | 17 +++++++
 .../hdds/scm/pipeline/PipelineStateManager.java    | 24 +++++++++
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  2 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  1 +
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 26 ++++++++++
 .../hdds/scm/server/SCMClientProtocolServer.java   | 18 +++++++
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java     |  4 ++
 .../cli/pipeline/ActivatePipelineSubcommand.java   | 53 ++++++++++++++++++++
 .../cli/pipeline/DeactivatePipelineSubcommand.java | 53 ++++++++++++++++++++
 .../scm/pipeline/TestPipelineStateManager.java     |  7 +++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 58 +++++++++++++++++++---
 20 files changed, 395 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index e2856d7..c97354f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -227,6 +227,18 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   @Override
+  public void activatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    storageContainerLocationClient.activatePipeline(pipelineID);
+  }
+
+  @Override
+  public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    storageContainerLocationClient.deactivatePipeline(pipelineID);
+  }
+
+  @Override
   public void closePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
     storageContainerLocationClient.closePipeline(pipelineID);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index c2dd5f9..226ceda 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -181,6 +181,22 @@ public interface ScmClient extends Closeable {
   List<Pipeline> listPipelines() throws IOException;
 
   /**
+   * Activates the pipeline given a pipeline ID.
+   *
+   * @param pipelineID PipelineID to activate.
+   * @throws IOException In case of exception while activating the pipeline
+   */
+  void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
+
+  /**
+   * Deactivates the pipeline given a pipeline ID.
+   *
+   * @param pipelineID PipelineID to deactivate.
+   * @throws IOException In case of exception while deactivating the pipeline
+   */
+  void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
+
+  /**
    * Closes the pipeline given a pipeline ID.
    *
    * @param pipelineID PipelineID to close.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index a84118a..1627569 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -354,7 +354,7 @@ public final class Pipeline {
    * Possible Pipeline states in SCM.
    */
   public enum PipelineState {
-    ALLOCATED, OPEN, CLOSED;
+    ALLOCATED, OPEN, DORMANT, CLOSED;
 
     public static PipelineState fromProtobuf(HddsProtos.PipelineState state)
         throws UnknownPipelineStateException {
@@ -362,6 +362,7 @@ public final class Pipeline {
       switch (state) {
       case PIPELINE_ALLOCATED: return ALLOCATED;
       case PIPELINE_OPEN: return OPEN;
+      case PIPELINE_DORMANT: return DORMANT;
       case PIPELINE_CLOSED: return CLOSED;
       default:
         throw new UnknownPipelineStateException(
@@ -375,6 +376,7 @@ public final class Pipeline {
       switch (state) {
       case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED;
       case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN;
+      case DORMANT: return HddsProtos.PipelineState.PIPELINE_DORMANT;
       case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED;
       default:
         throw new UnknownPipelineStateException(
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 565ce47..88db820 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -147,6 +147,22 @@ public interface StorageContainerLocationProtocol extends Closeable {
   List<Pipeline> listPipelines() throws IOException;
 
   /**
+   * Activates a dormant pipeline.
+   *
+   * @param pipelineID ID of the pipeline to activate.
+   * @throws IOException in case of any Exception
+   */
+  void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
+
+  /**
+   * Deactivates an active pipeline.
+   *
+   * @param pipelineID ID of the pipeline to deactivate.
+   * @throws IOException in case of any Exception
+   */
+  void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
+
+  /**
    * Closes a pipeline given the pipelineID.
    *
    * @param pipelineID ID of the pipeline to demolish
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 9e316f7..ab3fcd1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -20,6 +20,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
@@ -340,6 +342,36 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   @Override
+  public void activatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    try {
+      ActivatePipelineRequestProto request =
+          ActivatePipelineRequestProto.newBuilder()
+              .setTraceID(TracingUtil.exportCurrentSpan())
+              .setPipelineID(pipelineID)
+              .build();
+      rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    try {
+      DeactivatePipelineRequestProto request =
+          DeactivatePipelineRequestProto.newBuilder()
+              .setTraceID(TracingUtil.exportCurrentSpan())
+              .setPipelineID(pipelineID)
+              .build();
+      rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public void closePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
     try {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index b3b4879..d03ad15 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -33,6 +33,8 @@ public enum SCMAction implements AuditAction {
   LIST_CONTAINER,
   LIST_PIPELINE,
   CLOSE_PIPELINE,
+  ACTIVATE_PIPELINE,
+  DEACTIVATE_PIPELINE,
   DELETE_CONTAINER,
   IN_SAFE_MODE,
   FORCE_EXIT_SAFE_MODE,
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 30ef7ea..99c9e8d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -49,6 +49,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.ContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.ContainerResponseProto;
@@ -258,6 +266,32 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   }
 
   @Override
+  public ActivatePipelineResponseProto activatePipeline(
+      RpcController controller, ActivatePipelineRequestProto request)
+      throws ServiceException {
+    try (Scope ignored = TracingUtil
+        .importAndCreateScope("activatePipeline", request.getTraceID())) {
+      impl.activatePipeline(request.getPipelineID());
+      return ActivatePipelineResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DeactivatePipelineResponseProto deactivatePipeline(
+      RpcController controller, DeactivatePipelineRequestProto request)
+      throws ServiceException {
+    try (Scope ignored = TracingUtil
+        .importAndCreateScope("deactivatePipeline", request.getTraceID())) {
+      impl.deactivatePipeline(request.getPipelineID());
+      return DeactivatePipelineResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public ClosePipelineResponseProto closePipeline(
       RpcController controller, ClosePipelineRequestProto request)
       throws ServiceException {
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 4e4b50b..0c35876 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -167,6 +167,22 @@ message ListPipelineResponseProto {
   repeated Pipeline pipelines = 1;
 }
 
+message ActivatePipelineRequestProto {
+  required PipelineID pipelineID = 1;
+  optional string traceID = 2;
+}
+
+message ActivatePipelineResponseProto {
+}
+
+message DeactivatePipelineRequestProto {
+  required PipelineID pipelineID = 1;
+  optional string traceID = 2;
+}
+
+message DeactivatePipelineResponseProto {
+}
+
 message ClosePipelineRequestProto {
   required PipelineID pipelineID = 1;
   optional string traceID = 2;
@@ -274,6 +290,12 @@ service StorageContainerLocationProtocolService {
   rpc listPipelines(ListPipelineRequestProto)
       returns (ListPipelineResponseProto);
 
+  rpc activatePipeline(ActivatePipelineRequestProto)
+      returns (ActivatePipelineResponseProto);
+
+  rpc deactivatePipeline(DeactivatePipelineRequestProto)
+      returns (DeactivatePipelineResponseProto);
+
   /**
    * Closes a pipeline.
    */
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 6475f4c..d2bb355 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -62,7 +62,8 @@ message PipelineID {
 enum PipelineState {
     PIPELINE_ALLOCATED = 1;
     PIPELINE_OPEN = 2;
-    PIPELINE_CLOSED = 3;
+    PIPELINE_DORMANT = 3;
+    PIPELINE_CLOSED = 4;
 }
 
 message Pipeline {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 70bd64c..bd8fa2d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -77,4 +77,21 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean
{
   void triggerPipelineCreation();
 
   void incNumBlocksAllocatedMetric(PipelineID id);
+
+  /**
+   * Activates a dormant pipeline.
+   *
+   * @param pipelineID ID of the pipeline to activate.
+   * @throws IOException in case of any Exception
+   */
+  void activatePipeline(PipelineID pipelineID) throws IOException;
+
+  /**
+   * Deactivates an active pipeline.
+   *
+   * @param pipelineID ID of the pipeline to deactivate.
+   * @throws IOException in case of any Exception
+   */
+  void deactivatePipeline(PipelineID pipelineID) throws IOException;
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 6be747b..7615057 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -137,4 +137,28 @@ class PipelineStateManager {
     }
     return pipeline;
   }
+
+  /**
+   * Activates a dormant pipeline.
+   *
+   * @param pipelineID ID of the pipeline to activate.
+   * @throws IOException in case of any Exception
+   */
+  public void activatePipeline(PipelineID pipelineID)
+      throws IOException {
+    pipelineStateMap
+        .updatePipelineState(pipelineID, PipelineState.OPEN);
+  }
+
+  /**
+   * Deactivates an active pipeline.
+   *
+   * @param pipelineID ID of the pipeline to deactivate.
+   * @throws IOException in case of any Exception
+   */
+  public void deactivatePipeline(PipelineID pipelineID)
+      throws IOException {
+    pipelineStateMap
+        .updatePipelineState(pipelineID, PipelineState.DORMANT);
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 6fc27a6..443378c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -374,7 +374,7 @@ class PipelineStateMap {
     if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
       // for transition to OPEN state add pipeline to query2OpenPipelines
       query2OpenPipelines.get(query).add(updatedPipeline);
-    } else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) {
+    } else {
       // for transition from OPEN to CLOSED state remove pipeline from
       // query2OpenPipelines
       query2OpenPipelines.get(query).remove(pipeline);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 14fde07..9e22733 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -134,6 +134,7 @@ public class RatisPipelineProvider implements PipelineProvider {
     Set<DatanodeDetails> dnsUsed = new HashSet<>();
     stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
         p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+            p.getPipelineState().equals(PipelineState.DORMANT) ||
             p.getPipelineState().equals(PipelineState.ALLOCATED))
         .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 6660f47..d6457f3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -116,6 +117,7 @@ public class SCMPipelineManager implements PipelineManager {
     return stateManager;
   }
 
+  @VisibleForTesting
   public void setPipelineProvider(ReplicationType replicationType,
                                   PipelineProvider provider) {
     pipelineFactory.setProvider(replicationType, provider);
@@ -350,6 +352,30 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
+   * Activates a dormant pipeline.
+   *
+   * @param pipelineID ID of the pipeline to activate.
+   * @throws IOException in case of any Exception
+   */
+  @Override
+  public void activatePipeline(PipelineID pipelineID)
+      throws IOException {
+    stateManager.activatePipeline(pipelineID);
+  }
+
+  /**
+   * Deactivates an active pipeline.
+   *
+   * @param pipelineID ID of the pipeline to deactivate.
+   * @throws IOException in case of any Exception
+   */
+  @Override
+  public void deactivatePipeline(PipelineID pipelineID)
+      throws IOException {
+    stateManager.deactivatePipeline(pipelineID);
+  }
+
+  /**
    * Moves the pipeline to CLOSED state and sends close container command for
    * all the containers in the pipeline.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index bf75fef..7d9cb3e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -404,6 +404,24 @@ public class SCMClientProtocolServer implements
   }
 
   @Override
+  public void activatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    AUDIT.logReadSuccess(buildAuditMessageForSuccess(
+        SCMAction.ACTIVATE_PIPELINE, null));
+    scm.getPipelineManager().activatePipeline(
+        PipelineID.getFromProtobuf(pipelineID));
+  }
+
+  @Override
+  public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    AUDIT.logReadSuccess(buildAuditMessageForSuccess(
+        SCMAction.DEACTIVATE_PIPELINE, null));
+    scm.getPipelineManager().deactivatePipeline(
+        PipelineID.getFromProtobuf(pipelineID));
+  }
+
+  @Override
   public void closePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
     Map<String, String> auditMap = Maps.newHashMap();
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
index 3e8f3fa..1b95418 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
+import org.apache.hadoop.hdds.scm.cli.pipeline.ActivatePipelineSubcommand;
 import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
+import org.apache.hadoop.hdds.scm.cli.pipeline.DeactivatePipelineSubcommand;
 import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
 import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
@@ -84,6 +86,8 @@ import picocli.CommandLine.Option;
         CreateSubcommand.class,
         CloseSubcommand.class,
         ListPipelinesSubcommand.class,
+        ActivatePipelineSubcommand.class,
+        DeactivatePipelineSubcommand.class,
         ClosePipelineSubcommand.class,
         TopologySubcommand.class,
         ReplicationManagerCommands.class
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java
new file mode 100644
index 0000000..d8f7138
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of activatePipeline command.
+ */
+@CommandLine.Command(
+    name = "activatePipeline",
+    description = "Activates the given Pipeline",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ActivatePipelineSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private SCMCLI parent;
+
+  @CommandLine.Parameters(description = "ID of the pipeline to activate")
+  private String pipelineId;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.createScmClient()) {
+      scmClient.activatePipeline(
+          HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java
new file mode 100644
index 0000000..67342d0
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of deactivatePipeline command.
+ */
+@CommandLine.Command(
+    name = "deactivatePipeline",
+    description = "Deactivates the given Pipeline",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class DeactivatePipelineSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private SCMCLI parent;
+
+  @CommandLine.Parameters(description = "ID of the pipeline to deactivate")
+  private String pipelineId;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.createScmClient()) {
+      scmClient.deactivatePipeline(
+          HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 33dd7df..0bbfb53 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -246,6 +246,13 @@ public class TestPipelineStateManager {
           stateManager.openPipeline(pipeline.getId());
           pipelines.add(pipeline);
 
+          // 5 pipelines in dormant state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.openPipeline(pipeline.getId());
+          stateManager.deactivatePipeline(pipeline.getId());
+          pipelines.add(pipeline);
+
           // 5 pipelines in closed state for each type and factor
           pipeline = createDummyPipeline(type, factor, factor.getNumber());
           stateManager.addPipeline(pipeline);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index e1b8653..eebaa7d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.Pipeline
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -55,8 +55,8 @@ public class TestSCMPipelineManager {
   private static File testDir;
   private static Configuration conf;
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  @Before
+  public void setUp() throws Exception {
     conf = new OzoneConfiguration();
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
@@ -68,8 +68,8 @@ public class TestSCMPipelineManager {
     nodeManager = new MockNodeManager(true, 20);
   }
 
-  @AfterClass
-  public static void cleanup() throws IOException {
+  @After
+  public void cleanup() {
     FileUtil.fullyDelete(testDir);
   }
 
@@ -269,4 +269,50 @@ public class TestSCMPipelineManager {
         "NumPipelineCreationFailed", metrics);
     Assert.assertTrue(numPipelineCreateFailed == 0);
   }
+
+  @Test
+  public void testActivateDeactivatePipeline() throws IOException {
+    final SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    final PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+    final Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+    final PipelineID pid = pipeline.getId();
+
+    pipelineManager.openPipeline(pid);
+    pipelineManager.addContainerToPipeline(pid, ContainerID.valueof(1));
+
+    Assert.assertTrue(pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.OPEN).contains(pipeline));
+
+    Assert.assertEquals(Pipeline.PipelineState.OPEN,
+        pipelineManager.getPipeline(pid).getPipelineState());
+
+    pipelineManager.deactivatePipeline(pid);
+    Assert.assertEquals(Pipeline.PipelineState.DORMANT,
+        pipelineManager.getPipeline(pid).getPipelineState());
+
+    Assert.assertFalse(pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.OPEN).contains(pipeline));
+
+    pipelineManager.activatePipeline(pid);
+
+    Assert.assertTrue(pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.OPEN).contains(pipeline));
+
+    pipelineManager.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message