From common-commits-return-89939-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Fri Oct 26 14:51:14 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D6CBE180718 for ; Fri, 26 Oct 2018 14:51:11 +0200 (CEST) Received: (qmail 2803 invoked by uid 500); 26 Oct 2018 12:51:10 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 2778 invoked by uid 99); 26 Oct 2018 12:51:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Oct 2018 12:51:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8CEF7E0A32; Fri, 26 Oct 2018 12:51:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nanda@apache.org To: common-commits@hadoop.apache.org Date: Fri, 26 Oct 2018 12:51:11 -0000 Message-Id: <26419b774a6e4275a0ca96846b1d9ebe@git.apache.org> In-Reply-To: <95b9badd41c945f48cf8b6adfbb0bf62@git.apache.org> References: <95b9badd41c945f48cf8b6adfbb0bf62@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hadoop git commit: HDDS-694. Plugin new Pipeline management code in SCM. Contributed by Lokesh Jain. http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index 48939f1..9df9dff 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,24 +19,44 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Set; /** * Handles Stale node event. */ public class StaleNodeHandler implements EventHandler { + private static final Logger LOG = + LoggerFactory.getLogger(StaleNodeHandler.class); - private final PipelineSelector pipelineSelector; + private final NodeManager nodeManager; + private final PipelineManager pipelineManager; - public StaleNodeHandler(PipelineSelector pipelineSelector) { - this.pipelineSelector = pipelineSelector; + public StaleNodeHandler(NodeManager nodeManager, + PipelineManager pipelineManager) { + this.nodeManager = nodeManager; + this.pipelineManager = pipelineManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - pipelineSelector.handleStaleNode(datanodeDetails); + Set pipelineIds = + nodeManager.getPipelineByDnID(datanodeDetails.getUuid()); + for (PipelineID pipelineID : pipelineIds) { + try { + pipelineManager.finalizePipeline(pipelineID); + } catch (IOException e) { + LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, + datanodeDetails); + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java index 87f2222..bf19261 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.node.states; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import java.util.HashSet; import java.util.Set; @@ -55,7 +55,7 @@ public class Node2PipelineMap extends Node2ObjectsMap { * @param pipeline Pipeline to be added */ public synchronized void addPipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { + for (DatanodeDetails details : pipeline.getNodes()) { UUID dnId = details.getUuid(); dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) .add(pipeline.getId()); @@ -63,7 +63,7 @@ public class Node2PipelineMap extends Node2ObjectsMap { } public synchronized void removePipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { + for (DatanodeDetails details : pipeline.getNodes()) { UUID dnId = details.getUuid(); dn2ObjectMap.computeIfPresent(dnId, (k, v) -> { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 261c544..c06a3bd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -50,8 +50,8 @@ public final class PipelineFactory { return providers.get(type).create(factor); } - public Pipeline create(ReplicationType type, List nodes) - throws IOException { - return providers.get(type).create(nodes); + public Pipeline create(ReplicationType type, ReplicationFactor factor, + List nodes) { + return providers.get(type).create(factor, nodes); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 51f9e86..04ec535 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -36,14 +36,14 @@ public interface PipelineManager extends Closeable { Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) throws IOException; - Pipeline createPipeline(ReplicationType type, List nodes) - throws IOException; + Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, + List nodes); - Pipeline getPipeline(PipelineID pipelineID) throws IOException; + Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException; - List getPipelinesByType(ReplicationType type); + List getPipelines(ReplicationType type); - List getPipelinesByTypeAndFactor(ReplicationType type, + List getPipelines(ReplicationType type, ReplicationFactor factor); void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index 2fc2e0e..84b6375 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -31,5 +31,5 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor) throws IOException; - Pipeline create(List nodes) throws IOException; + Pipeline create(ReplicationFactor factor, List nodes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index ad11b47..6c31a12 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -76,7 +76,13 @@ public class PipelineReportHandler implements private void processPipelineReport(PipelineReport report, DatanodeDetails dn) throws IOException { PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID()); - Pipeline pipeline = pipelineManager.getPipeline(pipelineID); + Pipeline pipeline = null; + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + //TODO: introduce per datanode command for pipeline destroy + return; + } if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { pipeline.reportDatanode(dn); @@ -87,14 +93,14 @@ public class PipelineReportHandler implements } else if (pipeline.isClosed()) { int numContainers = pipelineManager.getNumberOfContainers(pipelineID); if (numContainers == 0) { - // if all the containers have been closed the pipeline can be destroyed + // remove the pipeline from the pipeline manager + pipelineManager.removePipeline(pipelineID); + // since all the containers have been closed the pipeline can be + // destroyed try (XceiverClientRatis client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { client.destroyPipeline(); } - // after successfully destroying the pipeline, the pipeline can be - // removed from the pipeline manager - pipelineManager.removePipeline(pipelineID); } } else { // In OPEN state case just report the datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 8f5f89a..67f74d3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; @@ -39,9 +37,6 @@ import java.util.Set; */ class PipelineStateManager { - private static final Logger LOG = LoggerFactory.getLogger( - org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class); - private final PipelineStateMap pipelineStateMap; PipelineStateManager(Configuration conf) { @@ -57,17 +52,20 @@ class PipelineStateManager { pipelineStateMap.addContainerToPipeline(pipelineId, containerID); } - Pipeline getPipeline(PipelineID pipelineID) throws IOException { + Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { return pipelineStateMap.getPipeline(pipelineID); } - List getPipelinesByType(ReplicationType type) { - return pipelineStateMap.getPipelinesByType(type); + List getPipelines(ReplicationType type) { + return pipelineStateMap.getPipelines(type); + } + + List getPipelines(ReplicationType type, ReplicationFactor factor) { + return pipelineStateMap.getPipelines(type, factor); } - List getPipelinesByTypeAndFactor(ReplicationType type, - ReplicationFactor factor) { - return pipelineStateMap.getPipelinesByTypeAndFactor(type, factor); + List getPipelines(ReplicationType type, PipelineState... states) { + return pipelineStateMap.getPipelines(type, states); } Set getContainers(PipelineID pipelineID) throws IOException { @@ -78,8 +76,8 @@ class PipelineStateManager { return pipelineStateMap.getNumberOfContainers(pipelineID); } - void removePipeline(PipelineID pipelineID) throws IOException { - pipelineStateMap.removePipeline(pipelineID); + Pipeline removePipeline(PipelineID pipelineID) throws IOException { + return pipelineStateMap.removePipeline(pipelineID); } void removeContainerFromPipeline(PipelineID pipelineID, @@ -87,7 +85,8 @@ class PipelineStateManager { pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); } - Pipeline finalizePipeline(PipelineID pipelineId) throws IOException { + Pipeline finalizePipeline(PipelineID pipelineId) + throws PipelineNotFoundException { Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId); if (!pipeline.isClosed()) { pipeline = pipelineStateMap http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 110d26b..7b69491 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -31,8 +31,7 @@ import java.util.stream.Collectors; /** * Holds the data structures which maintain the information about pipeline and - * its state. All the read write operations in this class are protected by a - * lock. + * its state. * Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and * pipeline2container would have a non-null mapping for it. */ @@ -65,12 +64,12 @@ class PipelineStateMap { String.format("Nodes size=%d, replication factor=%d do not match ", pipeline.getNodes().size(), pipeline.getFactor().getNumber())); - if (pipelineMap.putIfAbsent(pipeline.getID(), pipeline) != null) { - LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getID()); + if (pipelineMap.putIfAbsent(pipeline.getId(), pipeline) != null) { + LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getId()); throw new IOException(String - .format("Duplicate pipeline ID %s detected.", pipeline.getID())); + .format("Duplicate pipeline ID %s detected.", pipeline.getId())); } - pipeline2container.put(pipeline.getID(), new TreeSet<>()); + pipeline2container.put(pipeline.getId(), new TreeSet<>()); } /** @@ -85,12 +84,13 @@ class PipelineStateMap { Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); Preconditions.checkNotNull(containerID, - "container Id cannot be null"); + "Container Id cannot be null"); Pipeline pipeline = getPipeline(pipelineID); - if (!pipeline.isOpen()) { - throw new IOException( - String.format("%s is not in open state", pipelineID)); + if (pipeline.isClosed()) { + throw new IOException(String + .format("Cannot add container to pipeline=%s in closed state", + pipelineID)); } pipeline2container.get(pipelineID).add(containerID); } @@ -102,10 +102,14 @@ class PipelineStateMap { * @return Pipeline * @throws IOException if pipeline is not found */ - Pipeline getPipeline(PipelineID pipelineID) throws IOException { + Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { + Preconditions.checkNotNull(pipelineID, + "Pipeline Id cannot be null"); + Pipeline pipeline = pipelineMap.get(pipelineID); if (pipeline == null) { - throw new IOException(String.format("%s not found", pipelineID)); + throw new PipelineNotFoundException( + String.format("%s not found", pipelineID)); } return pipeline; } @@ -116,29 +120,52 @@ class PipelineStateMap { * @param type - ReplicationType * @return List of pipelines which have the specified replication type */ - List getPipelinesByType(ReplicationType type) { + List getPipelines(ReplicationType type) { Preconditions.checkNotNull(type, "Replication type cannot be null"); - return pipelineMap.values().stream().filter(p -> p.getType().equals(type)) + return pipelineMap.values().stream() + .filter(p -> p.getType().equals(type)) .collect(Collectors.toList()); } /** - * Get open pipeline corresponding to specified replication type and factor. + * Get pipeline corresponding to specified replication type and factor. * * @param type - ReplicationType * @param factor - ReplicationFactor - * @return List of open pipelines with specified replication type and factor + * @return List of pipelines with specified replication type and factor */ - List getPipelinesByTypeAndFactor(ReplicationType type, - ReplicationFactor factor) { + List getPipelines(ReplicationType type, ReplicationFactor factor) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + Preconditions.checkNotNull(factor, "Replication factor cannot be null"); + return pipelineMap.values().stream() - .filter(pipeline -> pipeline.isOpen() && pipeline.getType() == type + .filter(pipeline -> pipeline.getType() == type && pipeline.getFactor() == factor) .collect(Collectors.toList()); } /** + * Get list of pipeline corresponding to specified replication type and + * pipeline states. + * + * @param type - ReplicationType + * @param states - Array of required PipelineState + * @return List of pipelines with specified replication type and states + */ + List getPipelines(ReplicationType type, PipelineState... states) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + Preconditions.checkNotNull(states, "Pipeline state cannot be null"); + + Set pipelineStates = new HashSet<>(); + pipelineStates.addAll(Arrays.asList(states)); + return pipelineMap.values().stream().filter( + pipeline -> pipeline.getType() == type && pipelineStates + .contains(pipeline.getPipelineState())) + .collect(Collectors.toList()); + } + + /** * Get set of containerIDs corresponding to a pipeline. * * @param pipelineID - PipelineID @@ -146,10 +173,14 @@ class PipelineStateMap { * @throws IOException if pipeline is not found */ Set getContainers(PipelineID pipelineID) - throws IOException { + throws PipelineNotFoundException { + Preconditions.checkNotNull(pipelineID, + "Pipeline Id cannot be null"); + Set containerIDs = pipeline2container.get(pipelineID); if (containerIDs == null) { - throw new IOException(String.format("%s not found", pipelineID)); + throw new PipelineNotFoundException( + String.format("%s not found", pipelineID)); } return new HashSet<>(containerIDs); } @@ -161,10 +192,15 @@ class PipelineStateMap { * @return Number of containers belonging to the pipeline * @throws IOException if pipeline is not found */ - int getNumberOfContainers(PipelineID pipelineID) throws IOException { + int getNumberOfContainers(PipelineID pipelineID) + throws PipelineNotFoundException { + Preconditions.checkNotNull(pipelineID, + "Pipeline Id cannot be null"); + Set containerIDs = pipeline2container.get(pipelineID); if (containerIDs == null) { - throw new IOException(String.format("%s not found", pipelineID)); + throw new PipelineNotFoundException( + String.format("%s not found", pipelineID)); } return containerIDs.size(); } @@ -175,7 +211,7 @@ class PipelineStateMap { * @param pipelineID - PipelineID of the pipeline to be removed * @throws IOException if the pipeline is not empty or does not exist */ - void removePipeline(PipelineID pipelineID) throws IOException { + Pipeline removePipeline(PipelineID pipelineID) throws IOException { Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); Pipeline pipeline = getPipeline(pipelineID); @@ -192,6 +228,7 @@ class PipelineStateMap { pipelineMap.remove(pipelineID); pipeline2container.remove(pipelineID); + return pipeline; } /** @@ -210,6 +247,10 @@ class PipelineStateMap { "container Id cannot be null"); Set containerIDs = pipeline2container.get(pipelineID); + if (containerIDs == null) { + throw new PipelineNotFoundException( + String.format("%s not found", pipelineID)); + } containerIDs.remove(containerID); } @@ -223,7 +264,7 @@ class PipelineStateMap { * @throws IOException if pipeline does not exist */ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state) - throws IOException { + throws PipelineNotFoundException { Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 400ab24..590cd27 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -93,7 +93,7 @@ public class RatisPipelineProvider implements PipelineProvider { public Pipeline create(ReplicationFactor factor) throws IOException { // Get set of datanodes already used for ratis pipeline Set dnsUsed = new HashSet<>(); - stateManager.getPipelinesByType(ReplicationType.RATIS) + stateManager.getPipelines(ReplicationType.RATIS) .forEach(p -> dnsUsed.addAll(p.getNodes())); // Get list of healthy nodes @@ -112,7 +112,7 @@ public class RatisPipelineProvider implements PipelineProvider { Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.ALLOCATED) + .setState(PipelineState.OPEN) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) @@ -122,16 +122,11 @@ public class RatisPipelineProvider implements PipelineProvider { } @Override - public Pipeline create(List nodes) throws IOException { - ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size()); - if (factor == null) { - throw new IOException(String - .format("Nodes size=%d does not match any replication factor", - nodes.size())); - } + public Pipeline create(ReplicationFactor factor, + List nodes) { return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.ALLOCATED) + .setState(PipelineState.OPEN) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 6a9c783..a853693 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -78,14 +78,14 @@ public class SCMPipelineManager implements PipelineManager { File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB); this.pipelineStore = MetadataStoreBuilder.newBuilder() + .setCreateIfMissing(true) .setConf(conf) .setDbFile(pipelineDBPath) .setCacheSize(cacheSize * OzoneConsts.MB) .build(); - initializePipelineState(); - this.eventPublisher = eventPublisher; this.nodeManager = nodeManager; + initializePipelineState(); } private void initializePipelineState() throws IOException { @@ -97,12 +97,11 @@ public class SCMPipelineManager implements PipelineManager { pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null); for (Map.Entry entry : pipelines) { - Pipeline pipeline = Pipeline - .fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); + Pipeline pipeline = Pipeline.getFromProtobuf( + HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); Preconditions.checkNotNull(pipeline); stateManager.addPipeline(pipeline); - // TODO: add pipeline to node manager - // nodeManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); } } @@ -112,10 +111,10 @@ public class SCMPipelineManager implements PipelineManager { lock.writeLock().lock(); try { Pipeline pipeline = pipelineFactory.create(type, factor); - pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), + pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), pipeline.getProtobufMessage().toByteArray()); stateManager.addPipeline(pipeline); - // TODO: add pipeline to node manager + nodeManager.addPipeline(pipeline); return pipeline; } finally { lock.writeLock().unlock(); @@ -123,20 +122,20 @@ public class SCMPipelineManager implements PipelineManager { } @Override - public Pipeline createPipeline(ReplicationType type, - List nodes) - throws IOException { + public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, + List nodes) { // This will mostly be used to create dummy pipeline for SimplePipelines. lock.writeLock().lock(); try { - return pipelineFactory.create(type, nodes); + return pipelineFactory.create(type, factor, nodes); } finally { lock.writeLock().unlock(); } } @Override - public Pipeline getPipeline(PipelineID pipelineID) throws IOException { + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { lock.readLock().lock(); try { return stateManager.getPipeline(pipelineID); @@ -146,21 +145,21 @@ public class SCMPipelineManager implements PipelineManager { } @Override - public List getPipelinesByType(ReplicationType type) { + public List getPipelines(ReplicationType type) { lock.readLock().lock(); try { - return stateManager.getPipelinesByType(type); + return stateManager.getPipelines(type); } finally { lock.readLock().unlock(); } } @Override - public List getPipelinesByTypeAndFactor(ReplicationType type, + public List getPipelines(ReplicationType type, ReplicationFactor factor) { lock.readLock().lock(); try { - return stateManager.getPipelinesByTypeAndFactor(type, factor); + return stateManager.getPipelines(type, factor); } finally { lock.readLock().unlock(); } @@ -232,9 +231,9 @@ public class SCMPipelineManager implements PipelineManager { public void removePipeline(PipelineID pipelineID) throws IOException { lock.writeLock().lock(); try { - stateManager.removePipeline(pipelineID); pipelineStore.delete(pipelineID.getProtobuf().toByteArray()); - // TODO: remove pipeline from node manager + Pipeline pipeline = stateManager.removePipeline(pipelineID); + nodeManager.removePipeline(pipeline); } finally { lock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index c95fcfb..3e42df3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -54,7 +54,7 @@ public class SimplePipelineProvider implements PipelineProvider { Collections.shuffle(dns); return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.ALLOCATED) + .setState(PipelineState.OPEN) .setType(ReplicationType.STAND_ALONE) .setFactor(factor) .setNodes(dns.subList(0, factor.getNumber())) @@ -62,16 +62,11 @@ public class SimplePipelineProvider implements PipelineProvider { } @Override - public Pipeline create(List nodes) throws IOException { - ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size()); - if (factor == null) { - throw new IOException(String - .format("Nodes size=%d does not match any replication factor", - nodes.size())); - } + public Pipeline create(ReplicationFactor factor, + List nodes) { return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.ALLOCATED) + .setState(PipelineState.OPEN) .setType(ReplicationType.STAND_ALONE) .setFactor(factor) .setNodes(nodes) http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java deleted file mode 100644 index 1053149..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.pipelines; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineActionsFromDatanode; - -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles pipeline actions from datanode. - */ -public class PipelineActionEventHandler implements - EventHandler { - - public static final Logger LOG = LoggerFactory.getLogger( - PipelineActionEventHandler.class); - - public PipelineActionEventHandler() { - - } - - @Override - public void onMessage(PipelineActionsFromDatanode report, - EventPublisher publisher) { - for (PipelineAction action : report.getReport().getPipelineActionsList()) { - switch (action.getAction()) { - case CLOSE: - PipelineID pipelineID = PipelineID. - getFromProtobuf(action.getClosePipeline().getPipelineID()); - LOG.info("Closing pipeline " + pipelineID + " for reason:" + action - .getClosePipeline().getDetailedReason()); - publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID); - break; - default: - LOG.error("unknown pipeline action:{}" + action.getAction()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java deleted file mode 100644 index e49678f..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.pipelines; - -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles pipeline close event. - */ -public class PipelineCloseHandler implements EventHandler { - private static final Logger LOG = LoggerFactory - .getLogger(PipelineCloseHandler.class); - - private final PipelineSelector pipelineSelector; - public PipelineCloseHandler(PipelineSelector pipelineSelector) { - this.pipelineSelector = pipelineSelector; - } - - @Override - public void onMessage(PipelineID pipelineID, EventPublisher publisher) { - Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); - try { - if (pipeline != null) { - pipelineSelector.finalizePipeline(pipeline); - } else { - LOG.debug("pipeline:{} not found", pipelineID); - } - } catch (Exception e) { - LOG.info("failed to close pipeline:{}", pipelineID, e); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java deleted file mode 100644 index ca2e878..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.pipelines; - -import java.util.ArrayList; -import java.util.LinkedList; - -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Manage Ozone pipelines. - */ -public abstract class PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(PipelineManager.class); - protected final ArrayList activePipelines; - - public PipelineManager() { - activePipelines = new ArrayList<>(); - for (ReplicationFactor factor : ReplicationFactor.values()) { - activePipelines.add(factor.ordinal(), new ActivePipelines()); - } - } - - /** - * List of active pipelines. - */ - public static class ActivePipelines { - private final List activePipelines; - private final AtomicInteger pipelineIndex; - - ActivePipelines() { - activePipelines = new LinkedList<>(); - pipelineIndex = new AtomicInteger(0); - } - - void addPipeline(PipelineID pipelineID) { - if (!activePipelines.contains(pipelineID)) { - activePipelines.add(pipelineID); - } - } - - public void removePipeline(PipelineID pipelineID) { - activePipelines.remove(pipelineID); - } - - /** - * Find a Pipeline that is operational. - * - * @return - Pipeline or null - */ - PipelineID findOpenPipeline() { - if (activePipelines.size() == 0) { - LOG.error("No Operational pipelines found. Returning null."); - return null; - } - return activePipelines.get(getNextIndex()); - } - - /** - * gets the next index of the Pipeline to get. - * - * @return index in the link list to get. - */ - private int getNextIndex() { - return pipelineIndex.incrementAndGet() % activePipelines.size(); - } - } - - /** - * This function is called by the Container Manager while allocating a new - * container. The client specifies what kind of replication pipeline is - * needed and based on the replication type in the request appropriate - * Interface is invoked. - * - * @param replicationFactor - Replication Factor - * @return a Pipeline. - */ - public synchronized final PipelineID getPipeline( - ReplicationFactor replicationFactor, ReplicationType replicationType) { - PipelineID id = - activePipelines.get(replicationFactor.ordinal()).findOpenPipeline(); - if (id != null) { - LOG.debug("re-used pipeline:{} for container with " + - "replicationType:{} replicationFactor:{}", - id, replicationType, replicationFactor); - } - if (id == null) { - LOG.error("Get pipeline call failed. We are not able to find" + - " operational pipeline."); - return null; - } else { - return id; - } - } - - void addOpenPipeline(Pipeline pipeline) { - activePipelines.get(pipeline.getFactor().ordinal()) - .addPipeline(pipeline.getId()); - } - - public abstract Pipeline allocatePipeline( - ReplicationFactor replicationFactor); - - /** - * Initialize the pipeline. - * TODO: move the initialization to Ozone Client later - */ - public abstract void initializePipeline(Pipeline pipeline) throws IOException; - - public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { - if (pipeline.addMember(dn) - &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber()) - && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) { - addOpenPipeline(pipeline); - } - } - - /** - * Creates a pipeline with a specified replication factor and type. - * @param replicationFactor - Replication Factor. - * @param replicationType - Replication Type. - */ - public Pipeline createPipeline(ReplicationFactor replicationFactor, - ReplicationType replicationType) throws IOException { - Pipeline pipeline = allocatePipeline(replicationFactor); - if (pipeline != null) { - LOG.debug("created new pipeline:{} for container with " - + "replicationType:{} replicationFactor:{}", - pipeline.getId(), replicationType, replicationFactor); - } - return pipeline; - } - - /** - * Remove the pipeline from active allocation. - * @param pipeline pipeline to be finalized - */ - public abstract boolean finalizePipeline(Pipeline pipeline); - - /** - * - * @param pipeline - */ - public abstract void closePipeline(Pipeline pipeline) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java deleted file mode 100644 index 933792b..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.pipelines; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles Node Reports from datanode. - */ -public class PipelineReportHandler implements - EventHandler { - - private static final Logger LOGGER = LoggerFactory - .getLogger(PipelineReportHandler.class); - private final PipelineSelector pipelineSelector; - - public PipelineReportHandler(PipelineSelector pipelineSelector) { - Preconditions.checkNotNull(pipelineSelector); - this.pipelineSelector = pipelineSelector; - } - - @Override - public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, - EventPublisher publisher) { - Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - Preconditions.checkNotNull(dn, "Pipeline Report is " - + "missing DatanodeDetails."); - LOGGER.trace("Processing pipeline report for dn: {}", dn); - pipelineSelector.processPipelineReport(dn, pipelineReport); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java deleted file mode 100644 index c8d22ff..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ /dev/null @@ -1,481 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.pipelines; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .SCMContainerPlacementRandom; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; -import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.lease.Lease; -import org.apache.hadoop.ozone.lease.LeaseException; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; -import java.util.List; -import java.util.HashMap; -import java.util.Set; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_PIPELINE_STATE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_ACTIVE_PIPELINE; -import static org.apache.hadoop.hdds.server - .ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone - .OzoneConsts.SCM_PIPELINE_DB; - -/** - * Sends the request to the right pipeline manager. - */ -public class PipelineSelector { - private static final Logger LOG = - LoggerFactory.getLogger(PipelineSelector.class); - private final ContainerPlacementPolicy placementPolicy; - private final Map pipelineManagerMap; - private final Configuration conf; - private final EventPublisher eventPublisher; - private final long containerSize; - private final MetadataStore pipelineStore; - private final PipelineStateManager stateManager; - private final NodeManager nodeManager; - private final Map> pipeline2ContainerMap; - private final Map pipelineMap; - private final LeaseManager pipelineLeaseManager; - - /** - * Constructs a pipeline Selector. - * - * @param nodeManager - node manager - * @param conf - Ozone Config - */ - public PipelineSelector(NodeManager nodeManager, Configuration conf, - EventPublisher eventPublisher, int cacheSizeMB) throws IOException { - this.conf = conf; - this.eventPublisher = eventPublisher; - this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); - this.containerSize = (long)this.conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, - StorageUnit.BYTES); - pipelineMap = new ConcurrentHashMap<>(); - pipelineManagerMap = new HashMap<>(); - - pipelineManagerMap.put(ReplicationType.STAND_ALONE, - new StandaloneManagerImpl(nodeManager, placementPolicy, - containerSize)); - pipelineManagerMap.put(ReplicationType.RATIS, - new RatisManagerImpl(nodeManager, placementPolicy, - containerSize, conf)); - long pipelineCreationLeaseTimeout = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - pipelineLeaseManager = new LeaseManager<>("PipelineCreation", - pipelineCreationLeaseTimeout); - pipelineLeaseManager.start(); - - stateManager = new PipelineStateManager(); - this.nodeManager = nodeManager; - pipeline2ContainerMap = new HashMap<>(); - - // Write the container name to pipeline mapping. - File metaDir = getOzoneMetaDirPath(conf); - File containerDBPath = new File(metaDir, SCM_PIPELINE_DB); - pipelineStore = MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(containerDBPath) - .setCacheSize(cacheSizeMB * OzoneConsts.MB) - .build(); - - reloadExistingPipelines(); - } - - private void reloadExistingPipelines() throws IOException { - if (pipelineStore.isEmpty()) { - // Nothing to do just return - return; - } - - List> range = - pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null); - - // Transform the values into the pipelines. - // TODO: filter by pipeline state - for (Map.Entry entry : range) { - Pipeline pipeline = Pipeline.getFromProtoBuf( - HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); - Preconditions.checkNotNull(pipeline); - addExistingPipeline(pipeline); - } - } - - @VisibleForTesting - public Set getOpenContainerIDsByPipeline(PipelineID pipelineID) { - return pipeline2ContainerMap.get(pipelineID); - } - - public void addContainerToPipeline(PipelineID pipelineID, long containerID) { - pipeline2ContainerMap.get(pipelineID) - .add(ContainerID.valueof(containerID)); - } - - public void removeContainerFromPipeline(PipelineID pipelineID, - long containerID) throws IOException { - pipeline2ContainerMap.get(pipelineID) - .remove(ContainerID.valueof(containerID)); - closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID)); - } - - /** - * Translates a list of nodes, ordered such that the first is the leader, into - * a corresponding {@link Pipeline} object. - * - * @param nodes - list of datanodes on which we will allocate the container. - * The first of the list will be the leader node. - * @return pipeline corresponding to nodes - */ - public static Pipeline newPipelineFromNodes( - List nodes, ReplicationType replicationType, - ReplicationFactor replicationFactor, PipelineID id) { - Preconditions.checkNotNull(nodes); - Preconditions.checkArgument(nodes.size() > 0); - String leaderId = nodes.get(0).getUuidString(); - // A new pipeline always starts in allocated state - Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED, - replicationType, replicationFactor, id); - for (DatanodeDetails node : nodes) { - pipeline.addMember(node); - } - return pipeline; - } - - /** - * Create pluggable container placement policy implementation instance. - * - * @param nodeManager - SCM node manager. - * @param conf - configuration. - * @return SCM container placement policy implementation instance. - */ - @SuppressWarnings("unchecked") - private static ContainerPlacementPolicy createContainerPlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { - Class implClass = - (Class) conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementRandom.class); - - try { - Constructor ctor = - implClass.getDeclaredConstructor(NodeManager.class, - Configuration.class); - return ctor.newInstance(nodeManager, conf); - } catch (RuntimeException e) { - throw e; - } catch (InvocationTargetException e) { - throw new RuntimeException(implClass.getName() - + " could not be constructed.", e.getCause()); - } catch (Exception e) { - LOG.error("Unhandled exception occurred, Placement policy will not be " + - "functional."); - throw new IllegalArgumentException("Unable to load " + - "ContainerPlacementPolicy", e); - } - } - - /** - * This function is called by the Container Manager while allocating a new - * container. The client specifies what kind of replication pipeline is needed - * and based on the replication type in the request appropriate Interface is - * invoked. - */ - - public Pipeline getReplicationPipeline(ReplicationType replicationType, - HddsProtos.ReplicationFactor replicationFactor) - throws IOException { - PipelineManager manager = pipelineManagerMap.get(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Getting replication pipeline forReplicationType {} :" + - " ReplicationFactor {}", replicationType.toString(), - replicationFactor.toString()); - - /** - * In the Ozone world, we have a very simple policy. - * - * 1. Try to create a pipeline if there are enough free nodes. - * - * 2. This allows all nodes to part of a pipeline quickly. - * - * 3. if there are not enough free nodes, return already allocated pipeline - * in a round-robin fashion. - * - * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns pipelines in round robin - * fashion. - */ - Pipeline pipeline = - manager.createPipeline(replicationFactor, replicationType); - if (pipeline == null) { - // try to return a pipeline from already allocated pipelines - PipelineID pipelineId = - manager.getPipeline(replicationFactor, replicationType); - if (pipelineId == null) { - throw new SCMException(FAILED_TO_FIND_ACTIVE_PIPELINE); - } - pipeline = pipelineMap.get(pipelineId); - Preconditions.checkArgument(pipeline.getLifeCycleState() == - LifeCycleState.OPEN); - } else { - pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), - pipeline.getProtobufMessage().toByteArray()); - // if a new pipeline is created, initialize its state machine - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE); - - //TODO: move the initialization of pipeline to Ozone Client - manager.initializePipeline(pipeline); - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED); - } - return pipeline; - } - - /** - * This function to return pipeline for given pipeline id. - */ - public Pipeline getPipeline(PipelineID pipelineID) { - return pipelineMap.get(pipelineID); - } - - /** - * Finalize a given pipeline. - */ - public void finalizePipeline(Pipeline pipeline) throws IOException { - PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING || - pipeline.getLifeCycleState() == LifeCycleState.CLOSED) { - LOG.debug("pipeline:{} already in closing state, skipping", - pipeline.getId()); - // already in closing/closed state - return; - } - - // Remove the pipeline from active allocation - if (manager.finalizePipeline(pipeline)) { - LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId()); - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE); - closePipelineIfNoOpenContainers(pipeline); - } - } - - /** - * Close a given pipeline. - */ - private void closePipelineIfNoOpenContainers(Pipeline pipeline) - throws IOException { - if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) { - return; - } - HashSet containerIDS = - pipeline2ContainerMap.get(pipeline.getId()); - if (containerIDS.size() == 0) { - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE); - LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId()); - } - } - - /** - * Close a given pipeline. - */ - private void closePipeline(Pipeline pipeline) throws IOException { - PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId()); - HashSet containers = - pipeline2ContainerMap.get(pipeline.getId()); - Preconditions.checkArgument(containers.size() == 0); - manager.closePipeline(pipeline); - } - - /** - * Add to a given pipeline. - */ - private void addOpenPipeline(Pipeline pipeline) { - PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId()); - manager.addOpenPipeline(pipeline); - } - - private void closeContainersByPipeline(Pipeline pipeline) { - HashSet containers = - pipeline2ContainerMap.get(pipeline.getId()); - for (ContainerID id : containers) { - eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); - } - } - - private void addExistingPipeline(Pipeline pipeline) throws IOException { - LifeCycleState state = pipeline.getLifeCycleState(); - switch (state) { - case ALLOCATED: - // a pipeline in allocated state is only present in SCM and does not exist - // on datanode, on SCM restart, this pipeline can be ignored. - break; - case CREATING: - case OPEN: - case CLOSING: - //TODO: process pipeline report and move pipeline to active queue - // when all the nodes have reported. - pipelineMap.put(pipeline.getId(), pipeline); - pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - nodeManager.addPipeline(pipeline); - // reset the datanodes in the pipeline - // they will be reset on - pipeline.resetPipeline(); - break; - case CLOSED: - // if the pipeline is in closed state, nothing to do. - break; - default: - throw new IOException("invalid pipeline state:" + state); - } - } - - public void handleStaleNode(DatanodeDetails dn) { - Set pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid()); - for (PipelineID id : pipelineIDs) { - LOG.info("closing pipeline {}.", id); - eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); - } - } - - void processPipelineReport(DatanodeDetails dn, - PipelineReportsProto pipelineReport) { - Set reportedPipelines = new HashSet<>(); - pipelineReport.getPipelineReportList(). - forEach(p -> - reportedPipelines.add( - processPipelineReport(p.getPipelineID(), dn))); - - //TODO: handle missing pipelines and new pipelines later - } - - private PipelineID processPipelineReport( - HddsProtos.PipelineID id, DatanodeDetails dn) { - PipelineID pipelineID = PipelineID.getFromProtobuf(id); - Pipeline pipeline = pipelineMap.get(pipelineID); - if (pipeline != null) { - pipelineManagerMap.get(pipeline.getType()) - .processPipelineReport(pipeline, dn); - } - return pipelineID; - } - - /** - * Update the Pipeline State to the next state. - * - * @param pipeline - Pipeline - * @param event - LifeCycle Event - * @throws SCMException on Failure. - */ - public void updatePipelineState(Pipeline pipeline, - HddsProtos.LifeCycleEvent event) throws IOException { - try { - switch (event) { - case CREATE: - pipelineMap.put(pipeline.getId(), pipeline); - pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - nodeManager.addPipeline(pipeline); - // Acquire lease on pipeline - Lease pipelineLease = pipelineLeaseManager.acquire(pipeline); - // Register callback to be executed in case of timeout - pipelineLease.registerCallBack(() -> { - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT); - return null; - }); - break; - case CREATED: - // Release the lease on pipeline - pipelineLeaseManager.release(pipeline); - addOpenPipeline(pipeline); - break; - - case FINALIZE: - closeContainersByPipeline(pipeline); - break; - - case CLOSE: - case TIMEOUT: - closePipeline(pipeline); - pipeline2ContainerMap.remove(pipeline.getId()); - nodeManager.removePipeline(pipeline); - pipelineMap.remove(pipeline.getId()); - break; - default: - throw new SCMException("Unsupported pipeline LifeCycleEvent.", - FAILED_TO_CHANGE_PIPELINE_STATE); - } - - stateManager.updatePipelineState(pipeline, event); - pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), - pipeline.getProtobufMessage().toByteArray()); - } catch (LeaseException e) { - throw new IOException("Lease Exception.", e); - } - } - - public void shutdown() throws IOException { - if (pipelineLeaseManager != null) { - pipelineLeaseManager.shutdown(); - } - - if (pipelineStore != null) { - pipelineStore.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java deleted file mode 100644 index 6054f16..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.pipelines; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.common.statemachine - .InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_PIPELINE_STATE; - -/** - * Manages Pipeline states. - */ -public class PipelineStateManager { - private static final Logger LOG = - LoggerFactory.getLogger(PipelineStateManager.class); - - private final StateMachine stateMachine; - - PipelineStateManager() { - // Initialize the container state machine. - Set finalStates = new HashSet<>(); - // These are the steady states of a container. - finalStates.add(HddsProtos.LifeCycleState.OPEN); - finalStates.add(HddsProtos.LifeCycleState.CLOSED); - - this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED, - finalStates); - initializeStateMachine(); - } - - /** - * Event and State Transition Mapping. - * - * State: ALLOCATED ---------------> CREATING - * Event: CREATE - * - * State: CREATING ---------------> OPEN - * Event: CREATED - * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE - * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE - * - * State: CREATING ---------------> CLOSED - * Event: TIMEOUT - * - * - * Container State Flow: - * - * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] - * (CREATE) | (CREATED) (FINALIZE) | - * | | - * | | - * |(TIMEOUT) |(CLOSE) - * | | - * +--------> [CLOSED] <--------+ - */ - private void initializeStateMachine() { - stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED, - HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleEvent.CREATE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleEvent.CREATED); - - stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleEvent.FINALIZE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.CLOSE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.TIMEOUT); - } - - - /** - * Update the Pipeline State to the next state. - * - * @param pipeline - Pipeline - * @param event - LifeCycle Event - * @throws SCMException on Failure. - */ - public void updatePipelineState(Pipeline pipeline, - HddsProtos.LifeCycleEvent event) throws IOException { - HddsProtos.LifeCycleState newState; - try { - newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event); - } catch (InvalidStateTransitionException ex) { - String error = String.format("Failed to update pipeline state %s, " + - "reason: invalid state transition from state: %s upon " + - "event: %s.", - pipeline.getId(), pipeline.getLifeCycleState(), event); - LOG.error(error); - throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); - } - - // This is a post condition after executing getNextState. - Preconditions.checkNotNull(newState); - Preconditions.checkNotNull(pipeline); - pipeline.setLifeCycleState(newState); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java deleted file mode 100644 index ea24c58..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.pipelines; -/** - Ozone supports the notion of different kind of pipelines. - That means that we can have a replication pipeline build on - Ratis, Standalone or some other protocol. All Pipeline managers - the entities in charge of pipelines reside in the package. - - Here is the high level Arch. - - 1. A pipeline selector class is instantiated in the Container manager class. - - 2. A client when creating a container -- will specify what kind of - replication type it wants to use. We support 2 types now, Ratis and StandAlone. - - 3. Based on the replication type, the pipeline selector class asks the - corresponding pipeline manager for a pipeline. - - 4. We have supported the ability for clients to specify a set of nodes in - the pipeline or rely in the pipeline manager to select the datanodes if they - are not specified. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java deleted file mode 100644 index 905a5b5..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.pipelines.ratis; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -/** - * Implementation of {@link PipelineManager}. - * - * TODO : Introduce a state machine. - */ -public class RatisManagerImpl extends PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(RatisManagerImpl.class); - private final Configuration conf; - private final NodeManager nodeManager; - private final Set ratisMembers; - - /** - * Constructs a Ratis Pipeline Manager. - * - * @param nodeManager - */ - public RatisManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { - super(); - this.conf = conf; - this.nodeManager = nodeManager; - ratisMembers = new HashSet<>(); - } - - /** - * Allocates a new ratis Pipeline from the free nodes. - * - * @param factor - One or Three - * @return Pipeline. - */ - public Pipeline allocatePipeline(ReplicationFactor factor) { - List newNodesList = new LinkedList<>(); - List datanodes = nodeManager.getNodes(NodeState.HEALTHY); - //TODO: Add Raft State to the Nodes, so we can query and skip nodes from - // data from datanode instead of maintaining a set. - for (DatanodeDetails datanode : datanodes) { - Preconditions.checkNotNull(datanode); - if (!ratisMembers.contains(datanode)) { - newNodesList.add(datanode); - if (newNodesList.size() == factor.getNumber()) { - // once a datanode has been added to a pipeline, exclude it from - // further allocations - ratisMembers.addAll(newNodesList); - PipelineID pipelineID = PipelineID.randomId(); - LOG.info("Allocating a new ratis pipeline of size: {} id: {}", - factor.getNumber(), pipelineID); - return PipelineSelector.newPipelineFromNodes(newNodesList, - ReplicationType.RATIS, factor, pipelineID); - } - } - } - return null; - } - - public void initializePipeline(Pipeline pipeline) throws IOException { - //TODO:move the initialization from SCM to client - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(); - } - } - - public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { - super.processPipelineReport(pipeline, dn); - ratisMembers.add(dn); - } - - public synchronized boolean finalizePipeline(Pipeline pipeline) { - activePipelines.get(pipeline.getFactor().ordinal()) - .removePipeline(pipeline.getId()); - return true; - } - - /** - * Close the pipeline. - */ - public void closePipeline(Pipeline pipeline) throws IOException { - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.destroyPipeline(); - } - for (DatanodeDetails node : pipeline.getMachines()) { - // A node should always be the in ratis members list. - Preconditions.checkArgument(ratisMembers.remove(node)); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java deleted file mode 100644 index 2970fb3..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.pipelines.ratis; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java deleted file mode 100644 index 045afb6..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.pipelines.standalone; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -/** - * Standalone Manager Impl to prove that pluggable interface - * works with current tests. - */ -public class StandaloneManagerImpl extends PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(StandaloneManagerImpl.class); - private final NodeManager nodeManager; - private final ContainerPlacementPolicy placementPolicy; - private final long containerSize; - private final Set standAloneMembers; - - /** - * Constructor for Standalone Node Manager Impl. - * @param nodeManager - Node Manager. - * @param placementPolicy - Placement Policy - * @param containerSize - Container Size. - */ - public StandaloneManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long containerSize) { - super(); - this.nodeManager = nodeManager; - this.placementPolicy = placementPolicy; - this.containerSize = containerSize; - this.standAloneMembers = new HashSet<>(); - } - - - /** - * Allocates a new standalone Pipeline from the free nodes. - * - * @param factor - One - * @return Pipeline. - */ - public Pipeline allocatePipeline(ReplicationFactor factor) { - List newNodesList = new LinkedList<>(); - List datanodes = nodeManager.getNodes(NodeState.HEALTHY); - for (DatanodeDetails datanode : datanodes) { - Preconditions.checkNotNull(datanode); - if (!standAloneMembers.contains(datanode)) { - newNodesList.add(datanode); - if (newNodesList.size() == factor.getNumber()) { - // once a datanode has been added to a pipeline, exclude it from - // further allocations - standAloneMembers.addAll(newNodesList); - // Standalone pipeline use node id as pipeline - PipelineID pipelineID = - PipelineID.valueOf(newNodesList.get(0).getUuid()); - LOG.info("Allocating a new standalone pipeline of size: {} id: {}", - factor.getNumber(), pipelineID); - return PipelineSelector.newPipelineFromNodes(newNodesList, - ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID); - } - } - } - return null; - } - - public void initializePipeline(Pipeline pipeline) { - // Nothing to be done for standalone pipeline - } - - public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { - super.processPipelineReport(pipeline, dn); - standAloneMembers.add(dn); - } - - public synchronized boolean finalizePipeline(Pipeline pipeline) { - activePipelines.get(pipeline.getFactor().ordinal()) - .removePipeline(pipeline.getId()); - return false; - } - - /** - * Close the pipeline. - */ - public void closePipeline(Pipeline pipeline) throws IOException { - for (DatanodeDetails node : pipeline.getMachines()) { - // A node should always be the in standalone members list. - Preconditions.checkArgument(standAloneMembers.remove(node)); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java deleted file mode 100644 index b2c3ca40..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.pipelines.standalone; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 89a6c81..e92200a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org