hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject hadoop git commit: HDDS-695. Introduce new SCM Commands to list and close Pipelines. Contributed by Nanda kumar.
Date Tue, 23 Oct 2018 13:13:23 GMT
Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.3 0674f11fc -> 346afb0a5


HDDS-695. Introduce new SCM Commands to list and close Pipelines.
Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/346afb0a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/346afb0a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/346afb0a

Branch: refs/heads/ozone-0.3
Commit: 346afb0a5ce681642e1ba0a27b19944afd00df54
Parents: 0674f11
Author: Nanda kumar <nanda@apache.org>
Authored: Tue Oct 23 18:41:30 2018 +0530
Committer: Nanda kumar <nanda@apache.org>
Committed: Tue Oct 23 18:41:30 2018 +0530

----------------------------------------------------------------------
 .../scm/client/ContainerOperationClient.java    |  11 ++
 .../hadoop/hdds/scm/client/ScmClient.java       |  18 ++
 .../scm/container/common/helpers/Pipeline.java  |  14 +-
 .../StorageContainerLocationProtocol.java       |  17 ++
 ...rLocationProtocolClientSideTranslatorPB.java |  33 ++++
 ...rLocationProtocolServerSideTranslatorPB.java |  37 ++++
 .../StorageContainerLocationProtocol.proto      |  25 +++
 .../hdds/scm/pipelines/PipelineSelector.java    |  39 +++-
 .../scm/server/SCMClientProtocolServer.java     |  14 ++
 .../scm/pipelines/TestPipelineSelector.java     | 186 +++++++++++++++++++
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java  |   6 +-
 .../cli/pipeline/ClosePipelineSubcommand.java   |  53 ++++++
 .../cli/pipeline/ListPipelinesSubcommand.java   |  48 +++++
 .../hdds/scm/cli/pipeline/package-info.java     |  22 +++
 14 files changed, 513 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index c2bfb42..1911e42 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -255,6 +255,17 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   @Override
+  public List<Pipeline> listPipelines() throws IOException {
+    return storageContainerLocationClient.listPipelines();
+  }
+
+  @Override
+  public void closePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    storageContainerLocationClient.closePipeline(pipelineID);
+  }
+
+  @Override
   public void close() {
     try {
       xceiverClientManager.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 184c547..2bd1119 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -171,4 +171,22 @@ public interface ScmClient extends Closeable {
   Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
       throws IOException;
+
+  /**
+   * Returns the list of active PipelineIDs.
+   *
+   * @return list of PipelineID
+   *
+   * @throws IOException in case of any exception
+   */
+  List<Pipeline> listPipelines() throws IOException;
+
+  /**
+   * Closes the pipeline given a pipeline ID.
+   *
+   * @param pipelineID PipelineID to close.
+   *
+   * @throws IOException In case of exception while closing the pipeline
+   */
+  void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index c36ca1f..49c00a8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -284,18 +284,20 @@ public class Pipeline {
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
         .append("[");
-    getDatanodes().keySet().forEach(
-        node -> b.append(node.endsWith(getLeaderID()) ? "*" + id : id));
-    b.append(" id:").append(id);
+    b.append(" Id: ").append(id.getId());
+    b.append(", Nodes: ");
+    getDatanodes().values().forEach(b::append);
+
     if (getType() != null) {
-      b.append(" type:").append(getType().toString());
+      b.append(", Type:").append(getType().toString());
     }
     if (getFactor() != null) {
-      b.append(" factor:").append(getFactor().toString());
+      b.append(", Factor:").append(getFactor().toString());
     }
     if (getLifeCycleState() != null) {
-      b.append(" State:").append(getLifeCycleState().toString());
+      b.append(", State:").append(getLifeCycleState().toString());
     }
+    b.append("]");
     return b.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index e38077f..87a9245 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -127,6 +127,23 @@ public interface StorageContainerLocationProtocol {
       throws IOException;
 
   /**
+   * Returns the list of active PipelineIDs.
+   *
+   * @return list of PipelineID
+   *
+   * @throws IOException in case of any exception
+   */
+  List<Pipeline> listPipelines() throws IOException;
+
+  /**
+   * Closes a pipeline given the pipelineID.
+   *
+   * @param pipelineID ID of the pipeline to demolish
+   * @throws IOException
+   */
+  void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
+
+  /**
    * Returns information about SCM.
    *
    * @return {@link ScmInfo}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 16819e9..bbb12b8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -20,6 +20,9 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
@@ -64,6 +67,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * This class is the client-side translator to translate the requests made on
@@ -305,6 +309,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   @Override
+  public List<Pipeline> listPipelines() throws IOException {
+    try {
+      ListPipelineRequestProto request = ListPipelineRequestProto
+          .newBuilder().build();
+      ListPipelineResponseProto response = rpcProxy.listPipelines(
+          NULL_RPC_CONTROLLER, request);
+      return response.getPipelinesList().stream()
+          .map(Pipeline::getFromProtoBuf)
+          .collect(Collectors.toList());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void closePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    try {
+      ClosePipelineRequestProto request =
+          ClosePipelineRequestProto.newBuilder()
+          .setPipelineID(pipelineID)
+          .build();
+      rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public ScmInfo getScmInfo() throws IOException {
     HddsProtos.GetScmInfoRequestProto request =
         HddsProtos.GetScmInfoRequestProto.getDefaultInstance();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index d2723f0..7376ebc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -44,6 +45,14 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.ContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
@@ -212,6 +221,34 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   }
 
   @Override
+  public ListPipelineResponseProto listPipelines(
+      RpcController controller, ListPipelineRequestProto request)
+      throws ServiceException {
+    try {
+      ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
+          .newBuilder();
+      List<Pipeline> pipelineIDs = impl.listPipelines();
+      pipelineIDs.stream().map(Pipeline::getProtobufMessage)
+           .forEach(builder::addPipelines);
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public ClosePipelineResponseProto closePipeline(
+      RpcController controller, ClosePipelineRequestProto request)
+      throws ServiceException {
+    try {
+      impl.closePipeline(request.getPipelineID());
+      return ClosePipelineResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
       RpcController controller, HddsProtos.GetScmInfoRequestProto req)
       throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 49d1975..9396ccd 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -149,6 +149,19 @@ message  PipelineResponseProto {
   optional string errorMessage = 3;
 }
 
+message ListPipelineRequestProto {
+}
+
+message ListPipelineResponseProto {
+  repeated Pipeline pipelines = 1;
+}
+
+message ClosePipelineRequestProto {
+  required PipelineID pipelineID = 1;
+}
+
+message ClosePipelineResponseProto {
+}
 
 message InChillModeRequestProto {
 }
@@ -219,6 +232,18 @@ service StorageContainerLocationProtocolService {
       returns (PipelineResponseProto);
 
   /**
+   * Returns the list of Pipelines managed by SCM.
+   */
+  rpc listPipelines(ListPipelineRequestProto)
+      returns (ListPipelineResponseProto);
+
+  /**
+   * Closes a pipeline.
+   */
+  rpc closePipeline(ClosePipelineRequestProto)
+      returns (ClosePipelineResponseProto);
+
+  /**
   *  Returns information about SCM.
   */
   rpc getScmInfo(GetScmInfoRequestProto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index c8d22ff..7a54047 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -54,6 +54,8 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.HashMap;
@@ -167,9 +169,15 @@ public class PipelineSelector {
 
   public void removeContainerFromPipeline(PipelineID pipelineID,
                                           long containerID) throws IOException {
-    pipeline2ContainerMap.get(pipelineID)
-            .remove(ContainerID.valueof(containerID));
-    closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
+    if (pipeline2ContainerMap.containsKey(pipelineID)) {
+      pipeline2ContainerMap.get(pipelineID)
+          .remove(ContainerID.valueof(containerID));
+      closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
+    } else {
+      LOG.warn("Cannot remove container #{} from pipeline." +
+              " Pipeline #{} not found.",
+          containerID, pipelineID.getId());
+    }
   }
 
   /**
@@ -341,6 +349,31 @@ public class PipelineSelector {
     manager.closePipeline(pipeline);
   }
 
+  public List<Pipeline> listPipelines() {
+    return Collections.unmodifiableList(new ArrayList<>(pipelineMap.values()));
+  }
+
+  /**
+   * Closes the given pipeline.
+   */
+  public void closePipeline(PipelineID pipelineID) throws IOException {
+    final Pipeline pipeline = pipelineMap.get(pipelineID);
+    if (pipeline == null) {
+      // pipeline not found;
+      LOG.warn("Cannot close the pipeline. {} not found!", pipelineID);
+      return;
+    }
+    LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
+    finalizePipeline(pipeline);
+    if (pipeline.getLifeCycleState() != LifeCycleState.CLOSED) {
+      pipelineManagerMap.get(pipeline.getType()).closePipeline(pipeline);
+      pipeline2ContainerMap.remove(pipeline.getId());
+      nodeManager.removePipeline(pipeline);
+      pipelineMap.remove(pipeline.getId());
+    }
+    pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
+  }
+
   /**
    * Add to a given pipeline.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 3523499..3d228fa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -58,6 +59,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos
@@ -293,6 +295,18 @@ public class SCMClientProtocolServer implements
   }
 
   @Override
+  public List<Pipeline> listPipelines() {
+    return scm.getContainerManager().getPipelineSelector().listPipelines();
+  }
+
+  @Override
+  public void closePipeline(HddsProtos.PipelineID pipelineID)
+      throws IOException {
+    PipelineID id = PipelineID.valueOf(UUID.fromString(pipelineID.getId()));
+    scm.getContainerManager().getPipelineSelector().closePipeline(id);
+  }
+
+  @Override
   public ScmInfo getScmInfo() throws IOException {
     ScmInfo.Builder builder =
         new ScmInfo.Builder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java
new file mode 100644
index 0000000..aa9b382
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests the functionality of PipelineSelector.
+ */
+public class TestPipelineSelector {
+
+  @Test
+  public void testListPipelinesWithNoPipeline() throws IOException {
+    String storageDir = GenericTestUtils.getTempPath(
+        TestPipelineSelector.class.getName() + UUID.randomUUID());
+    try {
+      Configuration conf = new OzoneConfiguration();
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
+      PipelineSelector selector = new PipelineSelector(
+          Mockito.mock(NodeManager.class), conf,
+          Mockito.mock(EventPublisher.class),
+          ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+      Assert.assertTrue(selector.listPipelines().isEmpty());
+    } finally {
+      FileUtil.fullyDelete(new File(storageDir));
+    }
+  }
+
+  @Test
+  public void testListPipelines() throws IOException {
+    String storageDir = GenericTestUtils.getTempPath(
+        TestPipelineSelector.class.getName() + UUID.randomUUID());
+    try {
+      Configuration conf = new OzoneConfiguration();
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
+      PipelineSelector selector = new PipelineSelector(
+          Mockito.mock(NodeManager.class), conf,
+          Mockito.mock(EventPublisher.class),
+          ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+      getRandomPipeline(selector);
+      getRandomPipeline(selector);
+      getRandomPipeline(selector);
+      getRandomPipeline(selector);
+      getRandomPipeline(selector);
+      Assert.assertEquals(5, selector.listPipelines().size());
+    } finally {
+      FileUtil.fullyDelete(new File(storageDir));
+    }
+  }
+
+  @Test
+  public void testCloseEmptyPipeline() throws IOException {
+    String storageDir = GenericTestUtils.getTempPath(
+        TestPipelineSelector.class.getName() + UUID.randomUUID());
+    try {
+      Configuration conf = new OzoneConfiguration();
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
+      PipelineSelector selector = new PipelineSelector(
+          Mockito.mock(NodeManager.class), conf,
+          Mockito.mock(EventPublisher.class),
+          ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
+      // Create and add pipeline to selector.
+      Pipeline pipelineOne = getRandomPipeline(selector);
+      Pipeline pipelineTwo = getRandomPipeline(selector);
+
+      Assert.assertNotNull(selector.getPipeline(pipelineOne.getId()));
+      Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
+
+      selector.closePipeline(pipelineOne.getId());
+
+      Assert.assertNull(selector.getPipeline(pipelineOne.getId()));
+      Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
+    } finally {
+      FileUtil.fullyDelete(new File(storageDir));
+    }
+  }
+
+  @Test
+  public void testClosePipelineWithContainer() throws IOException {
+    String storageDir = GenericTestUtils.getTempPath(
+        TestPipelineSelector.class.getName() + UUID.randomUUID());
+    try {
+      Configuration conf = new OzoneConfiguration();
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
+      PipelineSelector selector = new PipelineSelector(
+          Mockito.mock(NodeManager.class), conf,
+          Mockito.mock(EventPublisher.class),
+          ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
+      // Create and add pipeline to selector.
+      Pipeline pipelineOne = getRandomPipeline(selector);
+      Pipeline pipelineTwo = getRandomPipeline(selector);
+
+      selector.addContainerToPipeline(pipelineOne.getId(), 1L);
+      selector.addContainerToPipeline(pipelineOne.getId(), 2L);
+      selector.addContainerToPipeline(pipelineOne.getId(), 3L);
+      selector.addContainerToPipeline(pipelineOne.getId(), 4L);
+      selector.addContainerToPipeline(pipelineOne.getId(), 5L);
+
+      Assert.assertNotNull(selector.getPipeline(pipelineOne.getId()));
+      Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
+
+      Assert.assertEquals(5,
+          selector.getOpenContainerIDsByPipeline(pipelineOne.getId()).size());
+
+      selector.closePipeline(pipelineOne.getId());
+
+      Assert.assertNull(selector.getPipeline(pipelineOne.getId()));
+      Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
+    } finally {
+      FileUtil.fullyDelete(new File(storageDir));
+    }
+  }
+
+  /**
+   * Creates a random pipeline and registers with PipelineSelector.
+   *
+   * @param selector PipelineSelector
+   * @return Pipeline
+   * @throws IOException
+   */
+  private Pipeline getRandomPipeline(PipelineSelector selector)
+      throws IOException{
+    DatanodeDetails ddOne = TestUtils.randomDatanodeDetails();
+    DatanodeDetails ddTwo = TestUtils.randomDatanodeDetails();
+    DatanodeDetails ddThree = TestUtils.randomDatanodeDetails();
+    Pipeline pipeline = new Pipeline(ddOne.getUuidString(),
+        LifeCycleState.ALLOCATED, ReplicationType.RATIS,
+        ReplicationFactor.THREE, PipelineID.randomId());
+    pipeline.addMember(ddOne);
+    pipeline.addMember(ddTwo);
+    pipeline.addMember(ddThree);
+    selector.updatePipelineState(pipeline, LifeCycleEvent.CREATE);
+    selector.updatePipelineState(pipeline, LifeCycleEvent.CREATED);
+    PipelineReport reportOne = PipelineReport.newBuilder()
+        .setPipelineID(pipeline.getId().getProtobuf()).build();
+    PipelineReportsProto reportsOne = PipelineReportsProto.newBuilder()
+        .addPipelineReport(reportOne).build();
+    pipeline.getDatanodes().values().forEach(
+        dd -> selector.processPipelineReport(dd, reportsOne));
+    return pipeline;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
index 59cd0ba..24eab9e 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
 import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
+import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
+import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
 import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -77,7 +79,9 @@ import picocli.CommandLine.Option;
         InfoSubcommand.class,
         DeleteSubcommand.class,
         CreateSubcommand.class,
-        CloseSubcommand.class
+        CloseSubcommand.class,
+        ListPipelinesSubcommand.class,
+        ClosePipelineSubcommand.class
     },
     mixinStandardHelpOptions = true)
 public class SCMCLI extends GenericCli {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
new file mode 100644
index 0000000..d99823b
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of closePipeline command.
+ */
+@CommandLine.Command(
+    name = "closePipeline",
+    description = "Close pipeline",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ClosePipelineSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private SCMCLI parent;
+
+  @CommandLine.Parameters(description = "ID of the pipeline to close")
+  private String pipelineId;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.createScmClient()) {
+      scmClient.closePipeline(
+          HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java
new file mode 100644
index 0000000..0f8cf28
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of listPipelines command.
+ */
+@CommandLine.Command(
+    name = "listPipelines",
+    description = "List all active pipelines",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ListPipelinesSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private SCMCLI parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.createScmClient()) {
+      scmClient.listPipelines().forEach(System.out::println);
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java
new file mode 100644
index 0000000..64924d1
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains all of the pipeline related scm commands.
+ */
+package org.apache.hadoop.hdds.scm.cli.pipeline;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message