From common-commits-return-89508-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Oct 17 10:28:10 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 29ECB18061A for ; Wed, 17 Oct 2018 10:28:07 +0200 (CEST) Received: (qmail 48588 invoked by uid 500); 17 Oct 2018 08:28:07 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 48579 invoked by uid 99); 17 Oct 2018 08:28:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Oct 2018 08:28:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1EA88DFB32; Wed, 17 Oct 2018 08:28:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nanda@apache.org To: common-commits@hadoop.apache.org Message-Id: <68198ec307834fc3860fc188228c7930@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDDS-656. Add logic for pipeline report and action processing in new pipeline code. Contributed by Lokesh Jain. Date: Wed, 17 Oct 2018 08:28:05 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 533138718 -> 64a43c92c HDDS-656. Add logic for pipeline report and action processing in new pipeline code. Contributed by Lokesh Jain. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/64a43c92 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/64a43c92 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/64a43c92 Branch: refs/heads/trunk Commit: 64a43c92c2133f3b9a317dcc4f0391ad6b604194 Parents: 5331387 Author: Nandakumar Authored: Wed Oct 17 13:56:54 2018 +0530 Committer: Nandakumar Committed: Wed Oct 17 13:57:38 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientRatis.java | 19 ++ .../hadoop/hdds/scm/pipeline/Pipeline.java | 103 ++++++--- hadoop-hdds/common/src/main/proto/hdds.proto | 1 + .../scm/pipeline/PipelineActionHandler.java | 66 ++++++ .../hdds/scm/pipeline/PipelineFactory.java | 7 +- .../hdds/scm/pipeline/PipelineManager.java | 13 +- .../scm/pipeline/PipelineReportHandler.java | 104 +++++++++ .../hdds/scm/pipeline/PipelineStateManager.java | 135 +++--------- .../hdds/scm/pipeline/PipelineStateMap.java | 76 ++++--- .../scm/pipeline/RatisPipelineProvider.java | 26 ++- .../hdds/scm/pipeline/SCMPipelineManager.java | 81 ++++--- .../scm/pipeline/SimplePipelineProvider.java | 6 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 17 ++ .../scm/pipeline/TestPipelineStateManager.java | 209 ++++++++++++++----- .../scm/pipeline/TestRatisPipelineProvider.java | 18 +- .../scm/pipeline/TestSCMPipelineManager.java | 187 +++++++++++++++++ .../pipeline/TestSimplePipelineProvider.java | 16 +- 17 files changed, 804 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 4efe7ba..45e9d6e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf @@ -73,6 +74,24 @@ public final class XceiverClientRatis extends XceiverClientSpi { retryPolicy); } + public static XceiverClientRatis newXceiverClientRatis( + org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, + Configuration ozoneConf) { + final String rpcType = ozoneConf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + Pipeline pipeline1 = + new Pipeline(pipeline.getNodes().get(0).getUuidString(), + HddsProtos.LifeCycleState.OPEN, pipeline.getType(), + pipeline.getFactor(), PipelineID.valueOf(pipeline.getID().getId())); + return new XceiverClientRatis(pipeline1, + SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, + retryPolicy); + } + private final Pipeline pipeline; private final RpcType rpcType; private final AtomicReference client = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index b58a001..b22a0c6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -23,12 +23,14 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -40,17 +42,17 @@ public final class Pipeline { private final ReplicationType type; private final ReplicationFactor factor; - private LifeCycleState state; - private List nodes; + private PipelineState state; + private Map nodeStatus; private Pipeline(PipelineID id, ReplicationType type, - ReplicationFactor factor, LifeCycleState state, - List nodes) { + ReplicationFactor factor, PipelineState state, + Map nodeStatus) { this.id = id; this.type = type; this.factor = factor; this.state = state; - this.nodes = nodes; + this.nodeStatus = nodeStatus; } /** @@ -85,36 +87,68 @@ public final class Pipeline { * * @return - LifeCycleStates. */ - public LifeCycleState getLifeCycleState() { + PipelineState getPipelineState() { + // TODO: See if we need to expose this. return state; } + public boolean isClosed() { + return state == PipelineState.CLOSED; + } + + public boolean isOpen() { + return state == PipelineState.OPEN; + } + + void reportDatanode(DatanodeDetails dn) throws IOException { + if (nodeStatus.get(dn) == null) { + throw new IOException( + String.format("Datanode=%s not part of pipeline=%s", dn, id)); + } + nodeStatus.put(dn, System.currentTimeMillis()); + } + + boolean isHealthy() { + for (Long reportedTime : nodeStatus.values()) { + if (reportedTime < 0) { + return false; + } + } + return true; + } + /** * Returns the list of nodes which form this pipeline. * * @return List of DatanodeDetails */ public List getNodes() { - return new ArrayList<>(nodes); + return new ArrayList<>(nodeStatus.keySet()); } public HddsProtos.Pipeline getProtobufMessage() { - HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder(); - builder.setId(id.getProtobuf()); - builder.setType(type); - builder.setState(state); - builder.addAllMembers(nodes.stream().map( - DatanodeDetails::getProtoBufMessage).collect(Collectors.toList())); + HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder() + .setId(id.getProtobuf()) + .setType(type) + .setFactor(factor) + .setLeaderID("") + .addAllMembers(nodeStatus.keySet().stream() + .map(DatanodeDetails::getProtoBufMessage) + .collect(Collectors.toList())); return builder.build(); } public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) { - return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()), - pipeline.getType(), pipeline.getFactor(), pipeline.getState(), - pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf) - .collect(Collectors.toList())); + return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) + .setFactor(pipeline.getFactor()) + .setType(pipeline.getType()) + .setState(PipelineState.ALLOCATED) + .setNodes(pipeline.getMembersList().stream() + .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) + .build(); } + @Override public boolean equals(Object o) { if (this == o) { @@ -131,7 +165,7 @@ public final class Pipeline { .append(type, that.type) .append(factor, that.factor) .append(state, that.state) - .append(nodes, that.nodes) + .append(nodeStatus, that.nodeStatus) .isEquals(); } @@ -142,7 +176,7 @@ public final class Pipeline { .append(type) .append(factor) .append(state) - .append(nodes) + .append(nodeStatus) .toHashCode(); } @@ -161,17 +195,17 @@ public final class Pipeline { private PipelineID id = null; private ReplicationType type = null; private ReplicationFactor factor = null; - private LifeCycleState state = null; - private List nodes = null; + private PipelineState state = null; + private Map nodeStatus = null; public Builder() {} public Builder(Pipeline pipeline) { - this.id = pipeline.getID(); - this.type = pipeline.getType(); - this.factor = pipeline.getFactor(); - this.state = pipeline.getLifeCycleState(); - this.nodes = pipeline.getNodes(); + this.id = pipeline.id; + this.type = pipeline.type; + this.factor = pipeline.factor; + this.state = pipeline.state; + this.nodeStatus = pipeline.nodeStatus; } public Builder setId(PipelineID id1) { @@ -189,13 +223,14 @@ public final class Pipeline { return this; } - public Builder setState(LifeCycleState state1) { + public Builder setState(PipelineState state1) { this.state = state1; return this; } - public Builder setNodes(List nodes1) { - this.nodes = nodes1; + public Builder setNodes(List nodes) { + this.nodeStatus = new LinkedHashMap<>(); + nodes.forEach(node -> nodeStatus.put(node, -1L)); return this; } @@ -204,8 +239,12 @@ public final class Pipeline { Preconditions.checkNotNull(type); Preconditions.checkNotNull(factor); Preconditions.checkNotNull(state); - Preconditions.checkNotNull(nodes); - return new Pipeline(id, type, factor, state, nodes); + Preconditions.checkNotNull(nodeStatus); + return new Pipeline(id, type, factor, state, nodeStatus); } } + + enum PipelineState { + ALLOCATED, OPEN, CLOSED + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index dedc57b..6525134 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -47,6 +47,7 @@ message PipelineID { message Pipeline { required string leaderID = 1; repeated DatanodeDetailsProto members = 2; + // TODO: remove the state and leaderID from this class optional LifeCycleState state = 3 [default = OPEN]; optional ReplicationType type = 4 [default = STAND_ALONE]; optional ReplicationFactor factor = 5 [default = ONE]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java new file mode 100644 index 0000000..a44ce9d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineActionsFromDatanode; + +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handles pipeline actions from datanode. + */ +public class PipelineActionHandler implements + EventHandler { + + public static final Logger LOG = LoggerFactory.getLogger( + PipelineActionHandler.class); + + private final PipelineManager pipelineManager; + + public PipelineActionHandler(PipelineManager pipelineManager) { + this.pipelineManager = pipelineManager; + } + + @Override + public void onMessage(PipelineActionsFromDatanode report, + EventPublisher publisher) { + for (PipelineAction action : report.getReport().getPipelineActionsList()) { + if (action.getAction() == PipelineAction.Action.CLOSE) { + PipelineID pipelineID = null; + try { + pipelineID = PipelineID. + getFromProtobuf(action.getClosePipeline().getPipelineID()); + pipelineManager.finalizePipeline(pipelineID); + } catch (IOException ioe) { + LOG.error("Could not execute pipeline action={} pipeline={} {}", + action, pipelineID, ioe); + } + } else { + LOG.error("unknown pipeline action:{}" + action.getAction()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 0265ff2..261c544 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -35,13 +36,13 @@ public final class PipelineFactory { private Map providers; - PipelineFactory(NodeManager nodeManager, - PipelineStateManager stateManager) { + PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, + Configuration conf) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, new SimplePipelineProvider(nodeManager)); providers.put(ReplicationType.RATIS, - new RatisPipelineProvider(nodeManager, stateManager)); + new RatisPipelineProvider(nodeManager, stateManager, conf)); } public Pipeline create(ReplicationType type, ReplicationFactor factor) http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 2d8cae3..51f9e86 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -41,18 +41,25 @@ public interface PipelineManager extends Closeable { Pipeline getPipeline(PipelineID pipelineID) throws IOException; + List getPipelinesByType(ReplicationType type); + + List getPipelinesByTypeAndFactor(ReplicationType type, + ReplicationFactor factor); + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException; - void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) - throws IOException; + void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException; Set getContainersInPipeline(PipelineID pipelineID) throws IOException; + int getNumberOfContainers(PipelineID pipelineID) throws IOException; + void finalizePipeline(PipelineID pipelineID) throws IOException; - void closePipeline(PipelineID pipelineId) throws IOException; + void openPipeline(PipelineID pipelineId) throws IOException; void removePipeline(PipelineID pipelineID) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java new file mode 100644 index 0000000..ad11b47 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handles Pipeline Reports from datanode. + */ +public class PipelineReportHandler implements + EventHandler { + + private static final Logger LOGGER = LoggerFactory + .getLogger(PipelineReportHandler.class); + private final PipelineManager pipelineManager; + private final Configuration conf; + + public PipelineReportHandler(PipelineManager pipelineManager, + Configuration conf) { + Preconditions.checkNotNull(pipelineManager); + this.pipelineManager = pipelineManager; + this.conf = conf; + } + + @Override + public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, + EventPublisher publisher) { + Preconditions.checkNotNull(pipelineReportFromDatanode); + DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + Preconditions.checkNotNull(dn, "Pipeline Report is " + + "missing DatanodeDetails."); + LOGGER.trace("Processing pipeline report for dn: {}", dn); + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + try { + processPipelineReport(report, dn); + } catch (IOException e) { + LOGGER.error("Could not process pipeline report={} from dn={} {}", + report, dn, e); + } + } + } + + private void processPipelineReport(PipelineReport report, DatanodeDetails dn) + throws IOException { + PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID()); + Pipeline pipeline = pipelineManager.getPipeline(pipelineID); + + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + pipeline.reportDatanode(dn); + if (pipeline.isHealthy()) { + // if all the dns have reported, pipeline can be moved to OPEN state + pipelineManager.openPipeline(pipelineID); + } + } else if (pipeline.isClosed()) { + int numContainers = pipelineManager.getNumberOfContainers(pipelineID); + if (numContainers == 0) { + // if all the containers have been closed the pipeline can be destroyed + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.destroyPipeline(); + } + // after successfully destroying the pipeline, the pipeline can be + // removed from the pipeline manager + pipelineManager.removePipeline(pipelineID); + } + } else { + // In OPEN state case just report the datanode + pipeline.reportDatanode(dn); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 9752b5a..8f5f89a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -19,25 +19,16 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; -import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE; /** * Manages the state of pipelines in SCM. All write operations like pipeline @@ -52,95 +43,9 @@ class PipelineStateManager { org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class); private final PipelineStateMap pipelineStateMap; - private final StateMachine stateMachine; - private final LeaseManager pipelineLeaseManager; PipelineStateManager(Configuration conf) { this.pipelineStateMap = new PipelineStateMap(); - Set finalStates = new HashSet<>(); - long pipelineCreationLeaseTimeout = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - // TODO: Use LeaseManager for creation of pipelines. - // Add pipeline initialization logic. - this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation", - pipelineCreationLeaseTimeout); - this.pipelineLeaseManager.start(); - - finalStates.add(LifeCycleState.CLOSED); - this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED, - finalStates); - initializeStateMachine(); - } - - - /* - * Event and State Transition Mapping. - * - * State: ALLOCATED ---------------> CREATING - * Event: CREATE - * - * State: CREATING ---------------> OPEN - * Event: CREATED - * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE - * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE - * - * State: CREATING ---------------> CLOSED - * Event: TIMEOUT - * - * - * Container State Flow: - * - * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] - * (CREATE) | (CREATED) (FINALIZE) | - * | | - * | | - * |(TIMEOUT) |(CLOSE) - * | | - * +--------> [CLOSED] <--------+ - */ - - /** - * Add javadoc. - */ - private void initializeStateMachine() { - stateMachine.addTransition(LifeCycleState.ALLOCATED, - LifeCycleState.CREATING, LifeCycleEvent.CREATE); - - stateMachine.addTransition(LifeCycleState.CREATING, - LifeCycleState.OPEN, LifeCycleEvent.CREATED); - - stateMachine.addTransition(LifeCycleState.OPEN, - LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE); - - stateMachine.addTransition(LifeCycleState.CLOSING, - LifeCycleState.CLOSED, LifeCycleEvent.CLOSE); - - stateMachine.addTransition(LifeCycleState.CREATING, - LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT); - } - - Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event) - throws IOException { - Pipeline pipeline = null; - try { - pipeline = pipelineStateMap.getPipeline(pipelineID); - LifeCycleState newState = - stateMachine.getNextState(pipeline.getLifeCycleState(), event); - return pipelineStateMap.updatePipelineState(pipeline.getID(), newState); - } catch (InvalidStateTransitionException ex) { - String error = String.format("Failed to update pipeline state %s, " - + "reason: invalid state transition from state: %s upon " - + "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(), - event); - LOG.error(error); - throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); - } } void addPipeline(Pipeline pipeline) throws IOException { @@ -156,14 +61,23 @@ class PipelineStateManager { return pipelineStateMap.getPipeline(pipelineID); } - List getPipelines(HddsProtos.ReplicationType type) { - return pipelineStateMap.getPipelines(type); + List getPipelinesByType(ReplicationType type) { + return pipelineStateMap.getPipelinesByType(type); + } + + List getPipelinesByTypeAndFactor(ReplicationType type, + ReplicationFactor factor) { + return pipelineStateMap.getPipelinesByTypeAndFactor(type, factor); } Set getContainers(PipelineID pipelineID) throws IOException { return pipelineStateMap.getContainers(pipelineID); } + int getNumberOfContainers(PipelineID pipelineID) throws IOException { + return pipelineStateMap.getNumberOfContainers(pipelineID); + } + void removePipeline(PipelineID pipelineID) throws IOException { pipelineStateMap.removePipeline(pipelineID); } @@ -173,7 +87,24 @@ class PipelineStateManager { pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); } - void close() { - pipelineLeaseManager.shutdown(); + Pipeline finalizePipeline(PipelineID pipelineId) throws IOException { + Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId); + if (!pipeline.isClosed()) { + pipeline = pipelineStateMap + .updatePipelineState(pipelineId, PipelineState.CLOSED); + } + return pipeline; + } + + Pipeline openPipeline(PipelineID pipelineId) throws IOException { + Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId); + if (pipeline.isClosed()) { + throw new IOException("Closed pipeline can not be opened"); + } + if (pipeline.getPipelineState() == PipelineState.ALLOCATED) { + pipeline = pipelineStateMap + .updatePipelineState(pipelineId, PipelineState.OPEN); + } + return pipeline; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index e3f2393..110d26b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ class PipelineStateMap { PipelineStateMap() { + // TODO: Use TreeMap for range operations? this.pipelineMap = new HashMap<>(); this.pipeline2container = new HashMap<>(); @@ -86,8 +88,7 @@ class PipelineStateMap { "container Id cannot be null"); Pipeline pipeline = getPipeline(pipelineID); - // TODO: verify the state we need the pipeline to be in - if (!isOpen(pipeline)) { + if (!pipeline.isOpen()) { throw new IOException( String.format("%s is not in open state", pipelineID)); } @@ -115,7 +116,7 @@ class PipelineStateMap { * @param type - ReplicationType * @return List of pipelines which have the specified replication type */ - List getPipelines(ReplicationType type) { + List getPipelinesByType(ReplicationType type) { Preconditions.checkNotNull(type, "Replication type cannot be null"); return pipelineMap.values().stream().filter(p -> p.getType().equals(type)) @@ -123,10 +124,25 @@ class PipelineStateMap { } /** - * Get set of containers corresponding to a pipeline. + * Get open pipeline corresponding to specified replication type and factor. + * + * @param type - ReplicationType + * @param factor - ReplicationFactor + * @return List of open pipelines with specified replication type and factor + */ + List getPipelinesByTypeAndFactor(ReplicationType type, + ReplicationFactor factor) { + return pipelineMap.values().stream() + .filter(pipeline -> pipeline.isOpen() && pipeline.getType() == type + && pipeline.getFactor() == factor) + .collect(Collectors.toList()); + } + + /** + * Get set of containerIDs corresponding to a pipeline. * * @param pipelineID - PipelineID - * @return Set of Containers belonging to the pipeline + * @return Set of containerIDs belonging to the pipeline * @throws IOException if pipeline is not found */ Set getContainers(PipelineID pipelineID) @@ -139,6 +155,21 @@ class PipelineStateMap { } /** + * Get number of containers corresponding to a pipeline. + * + * @param pipelineID - PipelineID + * @return Number of containers belonging to the pipeline + * @throws IOException if pipeline is not found + */ + int getNumberOfContainers(PipelineID pipelineID) throws IOException { + Set containerIDs = pipeline2container.get(pipelineID); + if (containerIDs == null) { + throw new IOException(String.format("%s not found", pipelineID)); + } + return containerIDs.size(); + } + + /** * Remove pipeline from the data structures. * * @param pipelineID - PipelineID of the pipeline to be removed @@ -147,12 +178,18 @@ class PipelineStateMap { void removePipeline(PipelineID pipelineID) throws IOException { Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); - //TODO: Add a flag which suppresses exception if pipeline does not exist? - Set containerIDs = getContainers(pipelineID); + Pipeline pipeline = getPipeline(pipelineID); + if (!pipeline.isClosed()) { + throw new IOException( + String.format("Pipeline with %s is not yet closed", pipelineID)); + } + + Set containerIDs = pipeline2container.get(pipelineID); if (containerIDs.size() != 0) { throw new IOException( String.format("Pipeline with %s is not empty", pipelineID)); } + pipelineMap.remove(pipelineID); pipeline2container.remove(pipelineID); } @@ -172,12 +209,8 @@ class PipelineStateMap { Preconditions.checkNotNull(containerID, "container Id cannot be null"); - Pipeline pipeline = getPipeline(pipelineID); Set containerIDs = pipeline2container.get(pipelineID); containerIDs.remove(containerID); - if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) { - removePipeline(pipelineID); - } } /** @@ -189,24 +222,13 @@ class PipelineStateMap { * @return Pipeline with the updated state * @throws IOException if pipeline does not exist */ - Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state) + Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state) throws IOException { Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); - Pipeline pipeline = getPipeline(pipelineID); - pipeline = pipelineMap - .put(pipelineID, Pipeline.newBuilder(pipeline).setState(state).build()); - // TODO: Verify if need to throw exception for non-existent pipeline - return pipeline; - } - - private boolean isClosingOrClosed(Pipeline pipeline) { - LifeCycleState state = pipeline.getLifeCycleState(); - return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED; - } - - private boolean isOpen(Pipeline pipeline) { - return pipeline.getLifeCycleState() == LifeCycleState.OPEN; + final Pipeline pipeline = getPipeline(pipelineID); + return pipelineMap.compute(pipelineID, + (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index b3bed33..400ab24 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -23,11 +23,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import java.io.IOException; import java.lang.reflect.Constructor; @@ -44,11 +45,13 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; + private final Configuration conf; RatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager) { + PipelineStateManager stateManager, Configuration conf) { this.nodeManager = nodeManager; this.stateManager = stateManager; + this.conf = conf; } /** @@ -90,7 +93,7 @@ public class RatisPipelineProvider implements PipelineProvider { public Pipeline create(ReplicationFactor factor) throws IOException { // Get set of datanodes already used for ratis pipeline Set dnsUsed = new HashSet<>(); - stateManager.getPipelines(ReplicationType.RATIS) + stateManager.getPipelinesByType(ReplicationType.RATIS) .forEach(p -> dnsUsed.addAll(p.getNodes())); // Get list of healthy nodes @@ -107,13 +110,15 @@ public class RatisPipelineProvider implements PipelineProvider { throw new IOException(e); } - return Pipeline.newBuilder() + Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(LifeCycleState.ALLOCATED) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) .build(); + initializePipeline(pipeline); + return pipeline; } @Override @@ -126,10 +131,19 @@ public class RatisPipelineProvider implements PipelineProvider { } return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(LifeCycleState.ALLOCATED) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) .build(); } + + private void initializePipeline(Pipeline pipeline) + throws IOException { + // TODO: remove old code in XceiverClientRatis#newXceiverClientRatis + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.createPipeline(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 3ee5849..6a9c783 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -22,11 +22,12 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; @@ -63,11 +64,14 @@ public class SCMPipelineManager implements PipelineManager { private final PipelineStateManager stateManager; private final MetadataStore pipelineStore; - public SCMPipelineManager(Configuration conf, NodeManager nodeManager) - throws IOException { + private final EventPublisher eventPublisher; + private final NodeManager nodeManager; + + public SCMPipelineManager(Configuration conf, NodeManager nodeManager, + EventPublisher eventPublisher) throws IOException { this.lock = new ReentrantReadWriteLock(); this.stateManager = new PipelineStateManager(conf); - this.pipelineFactory = new PipelineFactory(nodeManager, stateManager); + this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf); int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); File metaDir = getOzoneMetaDirPath(conf); @@ -78,8 +82,10 @@ public class SCMPipelineManager implements PipelineManager { .setDbFile(pipelineDBPath) .setCacheSize(cacheSize * OzoneConsts.MB) .build(); - initializePipelineState(); + + this.eventPublisher = eventPublisher; + this.nodeManager = nodeManager; } private void initializePipelineState() throws IOException { @@ -95,6 +101,8 @@ public class SCMPipelineManager implements PipelineManager { .fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); Preconditions.checkNotNull(pipeline); stateManager.addPipeline(pipeline); + // TODO: add pipeline to node manager + // nodeManager.addPipeline(pipeline); } } @@ -104,16 +112,10 @@ public class SCMPipelineManager implements PipelineManager { lock.writeLock().lock(); try { Pipeline pipeline = pipelineFactory.create(type, factor); + pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); stateManager.addPipeline(pipeline); - try { - pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), - pipeline.getProtobufMessage().toByteArray()); - } catch (IOException ioe) { - // if db operation fails we need to revert the pipeline creation in - // state manager. - stateManager.removePipeline(pipeline.getID()); - throw ioe; - } + // TODO: add pipeline to node manager return pipeline; } finally { lock.writeLock().unlock(); @@ -144,6 +146,27 @@ public class SCMPipelineManager implements PipelineManager { } @Override + public List getPipelinesByType(ReplicationType type) { + lock.readLock().lock(); + try { + return stateManager.getPipelinesByType(type); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List getPipelinesByTypeAndFactor(ReplicationType type, + ReplicationFactor factor) { + lock.readLock().lock(); + try { + return stateManager.getPipelinesByTypeAndFactor(type, factor); + } finally { + lock.readLock().unlock(); + } + } + + @Override public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { lock.writeLock().lock(); @@ -177,27 +200,29 @@ public class SCMPipelineManager implements PipelineManager { } @Override + public int getNumberOfContainers(PipelineID pipelineID) throws IOException { + return stateManager.getNumberOfContainers(pipelineID); + } + + @Override public void finalizePipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); try { - //TODO: close all containers in this pipeline - Pipeline pipeline = - stateManager.updatePipelineState(pipelineId, LifeCycleEvent.FINALIZE); - pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), - pipeline.getProtobufMessage().toByteArray()); + stateManager.finalizePipeline(pipelineId); + Set containerIDs = stateManager.getContainers(pipelineId); + for (ContainerID containerID : containerIDs) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); + } } finally { lock.writeLock().unlock(); } } @Override - public void closePipeline(PipelineID pipelineId) throws IOException { + public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); try { - Pipeline pipeline = - stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE); - pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), - pipeline.getProtobufMessage().toByteArray()); + stateManager.openPipeline(pipelineId); } finally { lock.writeLock().unlock(); } @@ -209,6 +234,7 @@ public class SCMPipelineManager implements PipelineManager { try { stateManager.removePipeline(pipelineID); pipelineStore.delete(pipelineID.getProtobuf().toByteArray()); + // TODO: remove pipeline from node manager } finally { lock.writeLock().unlock(); } @@ -216,11 +242,8 @@ public class SCMPipelineManager implements PipelineManager { @Override public void close() throws IOException { - lock.writeLock().lock(); - try { - stateManager.close(); - } finally { - lock.writeLock().unlock(); + if (pipelineStore != null) { + pipelineStore.close(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 56ffcd0..c95fcfb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -22,8 +22,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import java.io.IOException; import java.util.Collections; @@ -54,7 +54,7 @@ public class SimplePipelineProvider implements PipelineProvider { Collections.shuffle(dns); return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(LifeCycleState.ALLOCATED) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.STAND_ALONE) .setFactor(factor) .setNodes(dns.subList(0, factor.getNumber())) @@ -71,7 +71,7 @@ public class SimplePipelineProvider implements PipelineProvider { } return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(LifeCycleState.ALLOCATED) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.STAND_ALONE) .setFactor(factor) .setNodes(nodes) http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 24a16c7..21f00cd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.mockito.Mockito; import static org.mockito.Mockito.when; @@ -307,6 +311,19 @@ public final class TestUtils { return PipelineReportsProto.newBuilder().build(); } + public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode( + DatanodeDetails dn, + org.apache.hadoop.hdds.scm.pipeline.PipelineID... pipelineIDs) { + PipelineReportsProto.Builder reportBuilder = + PipelineReportsProto.newBuilder(); + for (org.apache.hadoop.hdds.scm.pipeline.PipelineID pipelineID : + pipelineIDs) { + reportBuilder.addPipelineReport( + PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf())); + } + return new PipelineReportFromDatanode(dn, reportBuilder.build()); + } + /** * Creates container report with the given ContainerInfo(s). * http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 0d4c461..49fb2bc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -48,15 +48,21 @@ public class TestPipelineStateManager { } private Pipeline createDummyPipeline(int numNodes) { + return createDummyPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, numNodes); + } + + private Pipeline createDummyPipeline(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, int numNodes) { List nodes = new ArrayList<>(); for (int i = 0; i < numNodes; i++) { nodes.add(TestUtils.randomDatanodeDetails()); } return Pipeline.newBuilder() - .setType(HddsProtos.ReplicationType.RATIS) - .setFactor(HddsProtos.ReplicationFactor.ONE) + .setType(type) + .setFactor(factor) .setNodes(nodes) - .setState(HddsProtos.LifeCycleState.ALLOCATED) + .setState(Pipeline.PipelineState.ALLOCATED) .setId(PipelineID.randomId()) .build(); } @@ -89,7 +95,7 @@ public class TestPipelineStateManager { Assert.assertTrue(pipeline == pipeline1); // clean up - stateManager.removePipeline(pipeline1.getID()); + removePipeline(pipeline); } @Test @@ -102,9 +108,63 @@ public class TestPipelineStateManager { pipelines.add(pipeline); stateManager.addPipeline(pipeline); - Set pipelines1 = new HashSet<>(stateManager.getPipelines( + Set pipelines1 = new HashSet<>(stateManager.getPipelinesByType( HddsProtos.ReplicationType.RATIS)); Assert.assertEquals(pipelines, pipelines1); + // clean up + for (Pipeline pipeline1 : pipelines) { + removePipeline(pipeline1); + } + } + + @Test + public void testGetPipelinesByTypeAndFactor() throws IOException { + Set pipelines = new HashSet<>(); + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + for (int i = 0; i < 5; i++) { + // 5 pipelines in allocated state for each type and factor + Pipeline pipeline = + createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + pipelines.add(pipeline); + + // 5 pipelines in allocated state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getID()); + pipelines.add(pipeline); + + // 5 pipelines in allocated state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.finalizePipeline(pipeline.getID()); + pipelines.add(pipeline); + } + } + } + + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + // verify pipelines received + List pipelines1 = + stateManager.getPipelinesByTypeAndFactor(type, factor); + Assert.assertEquals(5, pipelines1.size()); + pipelines1.stream().forEach(p -> { + Assert.assertEquals(p.getType(), type); + Assert.assertEquals(p.getFactor(), factor); + }); + } + } + + //clean up + for (Pipeline pipeline : pipelines) { + removePipeline(pipeline); + } } @Test @@ -115,8 +175,8 @@ public class TestPipelineStateManager { pipeline = stateManager.getPipeline(pipeline.getID()); try { - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); Assert.fail("Container should not have been added"); } catch (IOException e) { // add container possible only in container with open state @@ -124,16 +184,15 @@ public class TestPipelineStateManager { } // move pipeline to open state - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, - HddsProtos.LifeCycleEvent.CREATED); + stateManager.openPipeline(pipeline.getID()); // add three containers - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID)); - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); //verify the number of containers returned Set containerIDs = @@ -142,8 +201,8 @@ public class TestPipelineStateManager { removePipeline(pipeline); try { - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); Assert.fail("Container should not have been added"); } catch (IOException e) { // Can not add a container to removed pipeline @@ -155,8 +214,8 @@ public class TestPipelineStateManager { public void testRemovePipeline() throws IOException { Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, - HddsProtos.LifeCycleEvent.CREATED); + // close the pipeline + stateManager.openPipeline(pipeline.getID()); stateManager .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1)); @@ -165,6 +224,17 @@ public class TestPipelineStateManager { Assert.fail("Pipeline should not have been removed"); } catch (IOException e) { // can not remove a pipeline which already has containers + Assert.assertTrue(e.getMessage().contains("not yet closed")); + } + + // close the pipeline + stateManager.finalizePipeline(pipeline.getID()); + + try { + stateManager.removePipeline(pipeline.getID()); + Assert.fail("Pipeline should not have been removed"); + } catch (IOException e) { + // can not remove a pipeline which already has containers Assert.assertTrue(e.getMessage().contains("not empty")); } @@ -178,64 +248,87 @@ public class TestPipelineStateManager { Pipeline pipeline = createDummyPipeline(1); // create an open pipeline in stateMap stateManager.addPipeline(pipeline); - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, - HddsProtos.LifeCycleEvent.CREATED); + stateManager.openPipeline(pipeline.getID()); - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID)); - stateManager - .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID)); - // removeContainerFromPipeline in open pipeline does not lead to removal of pipeline - Assert.assertNotNull(stateManager.getPipeline(pipeline.getID())); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(containerID)); + Assert.assertEquals(1, stateManager.getContainers(pipeline.getID()).size()); + stateManager.removeContainerFromPipeline(pipeline.getID(), + ContainerID.valueof(containerID)); + Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size()); // add two containers in the pipeline - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); - stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); + stateManager.addContainerToPipeline(pipeline.getID(), + ContainerID.valueof(++containerID)); + Assert.assertEquals(2, stateManager.getContainers(pipeline.getID()).size()); // move pipeline to closing state - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE); + stateManager.finalizePipeline(pipeline.getID()); - stateManager - .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID)); - // removal of second last container in closing or closed pipeline should - // not lead to removal of pipeline - Assert.assertNotNull(stateManager.getPipeline(pipeline.getID())); - stateManager - .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(--containerID)); - // removal of last container in closing or closed pipeline should lead to - // removal of pipeline - try { - stateManager.getPipeline(pipeline.getID()); - Assert.fail("getPipeline should have failed."); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains(" not found")); - } + stateManager.removeContainerFromPipeline(pipeline.getID(), + ContainerID.valueof(containerID)); + stateManager.removeContainerFromPipeline(pipeline.getID(), + ContainerID.valueof(--containerID)); + Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size()); + + // clean up + stateManager.removePipeline(pipeline.getID()); } @Test - public void testUpdatePipelineState() throws IOException { + public void testFinalizePipeline() throws IOException { Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, - HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE, - HddsProtos.LifeCycleEvent.CLOSE); + // finalize on ALLOCATED pipeline + stateManager.finalizePipeline(pipeline.getID()); + Assert.assertEquals(Pipeline.PipelineState.CLOSED, + stateManager.getPipeline(pipeline.getID()).getPipelineState()); + // clean up + removePipeline(pipeline); + + pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getID()); + // finalize on OPEN pipeline + stateManager.finalizePipeline(pipeline.getID()); + Assert.assertEquals(Pipeline.PipelineState.CLOSED, + stateManager.getPipeline(pipeline.getID()).getPipelineState()); + // clean up + removePipeline(pipeline); pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, - HddsProtos.LifeCycleEvent.TIMEOUT); + stateManager.openPipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getID()); + // finalize should work on already closed pipeline + stateManager.finalizePipeline(pipeline.getID()); + Assert.assertEquals(Pipeline.PipelineState.CLOSED, + stateManager.getPipeline(pipeline.getID()).getPipelineState()); + // clean up + removePipeline(pipeline); } - private void updateEvents(PipelineID pipelineID, - HddsProtos.LifeCycleEvent... events) throws IOException { - for (HddsProtos.LifeCycleEvent event : events) { - stateManager.updatePipelineState(pipelineID, event); - } + @Test + public void testOpenPipeline() throws IOException { + Pipeline pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + // open on ALLOCATED pipeline + stateManager.openPipeline(pipeline.getID()); + Assert.assertEquals(Pipeline.PipelineState.OPEN, + stateManager.getPipeline(pipeline.getID()).getPipelineState()); + + stateManager.openPipeline(pipeline.getID()); + // open should work on already open pipeline + Assert.assertEquals(Pipeline.PipelineState.OPEN, + stateManager.getPipeline(pipeline.getID()).getPipelineState()); + // clean up + removePipeline(pipeline); } private void removePipeline(Pipeline pipeline) throws IOException { + stateManager.finalizePipeline(pipeline.getID()); Set containerIDs = stateManager.getContainers(pipeline.getID()); for (ContainerID containerID : containerIDs) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 6cf3e62..184143a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -47,7 +47,7 @@ public class TestRatisPipelineProvider { nodeManager = new MockNodeManager(true, 10); stateManager = new PipelineStateManager(new OzoneConfiguration()); provider = new RatisPipelineProvider(nodeManager, - stateManager); + stateManager, new OzoneConfiguration()); } @Test @@ -57,8 +57,8 @@ public class TestRatisPipelineProvider { stateManager.addPipeline(pipeline); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -70,8 +70,8 @@ public class TestRatisPipelineProvider { .isEmpty()); Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); - Assert.assertEquals(pipeline1.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline1.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -89,16 +89,16 @@ public class TestRatisPipelineProvider { Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals( + pipeline.getPipelineState(), Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; pipeline = provider.create(createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java new file mode 100644 index 0000000..0f9ad55 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.TestSCMContainerManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Test cases to verify PipelineManager. + */ +public class TestSCMPipelineManager { + private static MockNodeManager nodeManager; + private static File testDir; + private static Configuration conf; + + @BeforeClass + public static void setUp() throws Exception { + conf = new OzoneConfiguration(); + testDir = GenericTestUtils + .getTestDir(TestSCMContainerManager.class.getSimpleName()); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + boolean folderExisted = testDir.exists() || testDir.mkdirs(); + if (!folderExisted) { + throw new IOException("Unable to create test directory path"); + } + nodeManager = new MockNodeManager(true, 20); + } + + @AfterClass + public static void cleanup() throws IOException { + FileUtil.fullyDelete(testDir); + } + + @Test + public void testPipelineReload() throws IOException { + PipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue()); + Set pipelines = new HashSet<>(); + for (int i = 0; i < 5; i++) { + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + pipelines.add(pipeline); + } + pipelineManager.close(); + + // new pipeline manager should be able to load the pipelines from the db + pipelineManager = + new SCMPipelineManager(conf, nodeManager, + new EventQueue()); + List pipelineList = + pipelineManager.getPipelinesByType(HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipelines, new HashSet<>(pipelineList)); + + // clean up + for (Pipeline pipeline : pipelines) { + pipelineManager.finalizePipeline(pipeline.getID()); + pipelineManager.removePipeline(pipeline.getID()); + } + pipelineManager.close(); + } + + @Test + public void testRemovePipeline() throws IOException { + PipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue()); + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline.getID()); + pipelineManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1)); + pipelineManager.finalizePipeline(pipeline.getID()); + pipelineManager + .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(1)); + pipelineManager.removePipeline(pipeline.getID()); + pipelineManager.close(); + + // new pipeline manager should not be able to load removed pipelines + pipelineManager = + new SCMPipelineManager(conf, nodeManager, + new EventQueue()); + try { + pipelineManager.getPipeline(pipeline.getID()); + Assert.fail("Pipeline should not have been retrieved"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("not found")); + } + + // clean up + pipelineManager.close(); + } + + @Test + public void testPipelineReport() throws IOException { + PipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue()); + + // create a pipeline in allocated state with no dns yet reported + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + Assert + .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + Assert + .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isOpen()); + + // get pipeline report from each dn in the pipeline + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(pipelineManager, conf); + for (DatanodeDetails dn: pipeline.getNodes()) { + PipelineReportFromDatanode pipelineReportFromDatanode = + TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID()); + // pipeline is not healthy until all dns report + Assert.assertFalse( + pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + pipelineReportHandler + .onMessage(pipelineReportFromDatanode, new EventQueue()); + } + + // pipeline is healthy when all dns report + Assert + .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + // pipeline should now move to open state + Assert + .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isOpen()); + + // close the pipeline + pipelineManager.finalizePipeline(pipeline.getID()); + + for (DatanodeDetails dn: pipeline.getNodes()) { + PipelineReportFromDatanode pipelineReportFromDatanode = + TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID()); + // pipeline report for a closed pipeline should destroy the pipeline + // and remove it from the pipeline manager + pipelineReportHandler + .onMessage(pipelineReportFromDatanode, new EventQueue()); + } + + try { + pipelineManager.getPipeline(pipeline.getID()); + Assert.fail("Pipeline should not have been retrieved"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("not found")); + } + + // clean up + pipelineManager.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java index 0f56cc8..b44dbef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java @@ -56,8 +56,8 @@ public class TestSimplePipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -66,8 +66,8 @@ public class TestSimplePipelineProvider { Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline1.getFactor(), factor); - Assert.assertEquals(pipeline1.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline1.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -86,8 +86,8 @@ public class TestSimplePipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -95,8 +95,8 @@ public class TestSimplePipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); - Assert.assertEquals(pipeline.getLifeCycleState(), - HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org