From common-commits-return-88834-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Oct 3 12:31:47 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E8B0E18077A for ; Wed, 3 Oct 2018 12:31:44 +0200 (CEST) Received: (qmail 37039 invoked by uid 500); 3 Oct 2018 10:31:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 36663 invoked by uid 99); 3 Oct 2018 10:31:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Oct 2018 10:31:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B30CE0AC6; Wed, 3 Oct 2018 10:31:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nanda@apache.org To: common-commits@hadoop.apache.org Date: Wed, 03 Oct 2018 10:31:45 -0000 Message-Id: <55a05d2e370244309ecd204a1b8ef692@git.apache.org> In-Reply-To: <0f8b40e066cb4b5e94fa7ce996a1eb78@git.apache.org> References: <0f8b40e066cb4b5e94fa7ce996a1eb78@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: HDDS-567. Rename Mapping to ContainerManager in SCM. Contributed by Nanda kumar. HDDS-567. Rename Mapping to ContainerManager in SCM. Contributed by Nanda kumar. (cherry picked from commit 095c269620e01ce46832ea25e696c0ab71613ea3) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5d328662 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5d328662 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5d328662 Branch: refs/heads/ozone-0.2 Commit: 5d328662a04da2e21c7765936eed8db00da85a91 Parents: f4009ae Author: Nanda kumar Authored: Wed Oct 3 16:00:39 2018 +0530 Committer: Nanda kumar Committed: Wed Oct 3 16:01:16 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 6 +- .../block/DatanodeDeletedBlockTransactions.java | 10 +- .../hdds/scm/block/DeletedBlockLogImpl.java | 8 +- .../hdds/scm/block/SCMBlockDeletingService.java | 15 +- .../container/CloseContainerEventHandler.java | 4 +- .../scm/container/CloseContainerWatcher.java | 4 +- .../hdds/scm/container/ContainerManager.java | 142 ++++ .../hdds/scm/container/ContainerMapping.java | 699 ------------------- .../scm/container/ContainerReportHandler.java | 12 +- .../scm/container/ContainerStateManager.java | 8 +- .../hadoop/hdds/scm/container/Mapping.java | 141 ---- .../hdds/scm/container/SCMContainerManager.java | 699 +++++++++++++++++++ .../scm/server/SCMClientProtocolServer.java | 22 +- .../scm/server/SCMDatanodeProtocolServer.java | 2 +- .../scm/server/StorageContainerManager.java | 36 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 6 +- .../hdds/scm/block/TestDeletedBlockLog.java | 8 +- .../TestCloseContainerEventHandler.java | 4 +- .../scm/container/TestContainerMapping.java | 380 ---------- .../container/TestContainerReportHandler.java | 10 +- .../container/TestContainerStateManager.java | 2 +- .../scm/container/TestSCMContainerManager.java | 378 ++++++++++ .../hdds/scm/node/TestContainerPlacement.java | 9 +- .../hdds/scm/node/TestDeadNodeHandler.java | 4 +- .../container/TestCloseContainerWatcher.java | 12 +- .../TestContainerStateManagerIntegration.java | 52 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 21 +- .../hdds/scm/pipeline/TestNodeFailure.java | 18 +- .../hdds/scm/pipeline/TestPipelineClose.java | 28 +- .../hdds/scm/pipeline/TestSCMRestart.java | 33 +- .../org/apache/hadoop/ozone/OzoneTestUtils.java | 6 +- .../ozone/client/rest/TestOzoneRestClient.java | 2 +- .../rpc/TestCloseContainerHandlingByClient.java | 6 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 2 +- .../commandhandler/TestBlockDeletion.java | 3 +- .../TestCloseContainerByPipeline.java | 6 +- .../TestCloseContainerHandler.java | 2 +- .../hadoop/ozone/om/TestScmChillMode.java | 12 +- .../hadoop/ozone/scm/TestContainerSQLCli.java | 12 +- 39 files changed, 1415 insertions(+), 1409 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 4777016..30740c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmUtils; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; @@ -70,7 +70,7 @@ public class BlockManagerImpl implements EventHandler, // by itself and does not rely on the Block service offered by SCM. private final NodeManager nodeManager; - private final Mapping containerManager; + private final ContainerManager containerManager; private final long containerSize; @@ -92,7 +92,7 @@ public class BlockManagerImpl implements EventHandler, * @throws IOException */ public BlockManagerImpl(final Configuration conf, - final NodeManager nodeManager, final Mapping containerManager, + final NodeManager nodeManager, final ContainerManager containerManager, EventPublisher eventPublisher) throws IOException { this.nodeManager = nodeManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 8702a42..c86f9cd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -17,7 +17,7 @@ package org.apache.hadoop.hdds.scm.block; import com.google.common.collect.ArrayListMultimap; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -42,15 +42,15 @@ public class DatanodeDeletedBlockTransactions { private int maximumAllowedTXNum; // Current counter of inserted TX. private int currentTXNum; - private Mapping mappingService; + private ContainerManager containerManager; // A list of TXs mapped to a certain datanode ID. private final ArrayListMultimap transactions; - DatanodeDeletedBlockTransactions(Mapping mappingService, + DatanodeDeletedBlockTransactions(ContainerManager containerManager, int maximumAllowedTXNum, int nodeNum) { this.transactions = ArrayListMultimap.create(); - this.mappingService = mappingService; + this.containerManager = containerManager; this.maximumAllowedTXNum = maximumAllowedTXNum; this.nodeNum = nodeNum; } @@ -60,7 +60,7 @@ public class DatanodeDeletedBlockTransactions { Pipeline pipeline = null; try { ContainerWithPipeline containerWithPipeline = - mappingService.getContainerWithPipeline(tx.getContainerID()); + containerManager.getContainerWithPipeline(tx.getContainerID()); if (containerWithPipeline.getContainerInfo().isContainerOpen() || containerWithPipeline.getPipeline().isEmpty()) { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 68435d1..5d3afd5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.scm.command .CommandStatusReportHandler.DeleteBlockStatus; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -92,15 +92,15 @@ public class DeletedBlockLogImpl private final int maxRetry; private final MetadataStore deletedStore; - private final Mapping containerManager; + private final ContainerManager containerManager; private final Lock lock; // The latest id of deleted blocks in the db. private long lastTxID; // Maps txId to set of DNs which are successful in committing the transaction private Map> transactionToDNsCommitMap; - public DeletedBlockLogImpl(Configuration conf, Mapping containerManager) - throws IOException { + public DeletedBlockLogImpl(Configuration conf, + ContainerManager containerManager) throws IOException { maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index b85d77f..8e07fa2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.block; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -63,7 +63,7 @@ public class SCMBlockDeletingService extends BackgroundService { // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2; private final DeletedBlockLog deletedBlockLog; - private final Mapping mappingService; + private final ContainerManager containerManager; private final NodeManager nodeManager; private final EventPublisher eventPublisher; @@ -81,12 +81,13 @@ public class SCMBlockDeletingService extends BackgroundService { private int blockDeleteLimitSize; public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, - Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher, - long interval, long serviceTimeout, Configuration conf) { + ContainerManager containerManager, NodeManager nodeManager, + EventPublisher eventPublisher, long interval, long serviceTimeout, + Configuration conf) { super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; - this.mappingService = mapper; + this.containerManager = containerManager; this.nodeManager = nodeManager; this.eventPublisher = eventPublisher; @@ -139,7 +140,7 @@ public class SCMBlockDeletingService extends BackgroundService { List datanodes = nodeManager.getNodes(NodeState.HEALTHY); Map transactionMap = null; if (datanodes != null) { - transactions = new DatanodeDeletedBlockTransactions(mappingService, + transactions = new DatanodeDeletedBlockTransactions(containerManager, blockDeleteLimitSize, datanodes.size()); try { transactionMap = deletedBlockLog.getTransactions(transactions); @@ -174,7 +175,7 @@ public class SCMBlockDeletingService extends BackgroundService { transactions.getTransactionIDList(dnId))); } } - mappingService.updateDeleteTransactionId(transactionMap); + containerManager.updateDeleteTransactionId(transactionMap); } if (dnTxCount > 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 7baecc4..8be7803 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -46,9 +46,9 @@ public class CloseContainerEventHandler implements EventHandler { LoggerFactory.getLogger(CloseContainerEventHandler.class); - private final Mapping containerManager; + private final ContainerManager containerManager; - public CloseContainerEventHandler(Mapping containerManager) { + public CloseContainerEventHandler(ContainerManager containerManager) { this.containerManager = containerManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java index 8e277b9..7b94bd2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java @@ -44,11 +44,11 @@ public class CloseContainerWatcher extends public static final Logger LOG = LoggerFactory.getLogger(CloseContainerWatcher.class); - private final Mapping containerManager; + private final ContainerManager containerManager; public CloseContainerWatcher(Event startEvent, Event completionEvent, - LeaseManager leaseManager, Mapping containerManager) { + LeaseManager leaseManager, ContainerManager containerManager) { super(startEvent, completionEvent, leaseManager); this.containerManager = containerManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java new file mode 100644 index 0000000..e586f3e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * ContainerManager class contains the mapping from a name to a pipeline + * mapping. This is used by SCM when allocating new locations and when + * looking up a key. + */ +public interface ContainerManager extends Closeable { + /** + * Returns the ContainerInfo from the container ID. + * + * @param containerID - ID of container. + * @return - ContainerInfo such as creation state and the pipeline. + * @throws IOException + */ + ContainerInfo getContainer(long containerID) throws IOException; + + /** + * Returns the ContainerInfo from the container ID. + * + * @param containerID - ID of container. + * @return - ContainerWithPipeline such as creation state and the pipeline. + * @throws IOException + */ + ContainerWithPipeline getContainerWithPipeline(long containerID) + throws IOException; + + /** + * Returns containers under certain conditions. + * Search container IDs from start ID(exclusive), + * The max size of the searching range cannot exceed the + * value of count. + * + * @param startContainerID start containerID, >=0, + * start searching at the head if 0. + * @param count count must be >= 0 + * Usually the count will be replace with a very big + * value instead of being unlimited in case the db is very big. + * + * @return a list of container. + * @throws IOException + */ + List listContainer(long startContainerID, int count) + throws IOException; + + /** + * Allocates a new container for a given keyName and replication factor. + * + * @param replicationFactor - replication factor of the container. + * @param owner + * @return - ContainerWithPipeline. + * @throws IOException + */ + ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor replicationFactor, String owner) + throws IOException; + + /** + * Deletes a container from SCM. + * + * @param containerID - Container ID + * @throws IOException + */ + void deleteContainer(long containerID) throws IOException; + + /** + * Update container state. + * @param containerID - Container ID + * @param event - container life cycle event + * @return - new container state + * @throws IOException + */ + HddsProtos.LifeCycleState updateContainerState(long containerID, + HddsProtos.LifeCycleEvent event) throws IOException; + + /** + * Returns the container State Manager. + * @return ContainerStateManager + */ + ContainerStateManager getStateManager(); + + /** + * Process container report from Datanode. + * + * @param reports Container report + */ + void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports, boolean isRegisterCall) + throws IOException; + + /** + * Update deleteTransactionId according to deleteTransactionMap. + * + * @param deleteTransactionMap Maps the containerId to latest delete + * transaction id for the container. + * @throws IOException + */ + void updateDeleteTransactionId(Map deleteTransactionMap) + throws IOException; + + /** + * Returns the ContainerWithPipeline. + * @return NodeManager + */ + ContainerWithPipeline getMatchingContainerWithPipeline(long size, + String owner, ReplicationType type, ReplicationFactor factor, + LifeCycleState state) throws IOException; + + PipelineSelector getPipelineSelector(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java deleted file mode 100644 index 71e17e9..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ /dev/null @@ -1,699 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- *

http://www.apache.org/licenses/LICENSE-2.0 - *

- *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.lease.Lease; -import org.apache.hadoop.ozone.lease.LeaseException; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_CONTAINER_STATE; -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; - -/** - * Mapping class contains the mapping from a name to a pipeline mapping. This - * is used by SCM when - * allocating new locations and when looking up a key. - */ -public class ContainerMapping implements Mapping { - private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping - .class); - - private final NodeManager nodeManager; - private final long cacheSize; - private final Lock lock; - private final Charset encoding = Charset.forName("UTF-8"); - private final MetadataStore containerStore; - private final PipelineSelector pipelineSelector; - private final ContainerStateManager containerStateManager; - private final LeaseManager containerLeaseManager; - private final EventPublisher eventPublisher; - private final long size; - - /** - * Constructs a mapping class that creates mapping between container names - * and pipelines. - * - * @param nodeManager - NodeManager so that we can get the nodes that are - * healthy to place new - * containers. - * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache - * its nodes. This is - * passed to LevelDB and this memory is allocated in Native code space. - * CacheSize is specified - * in MB. - * @throws IOException on Failure. - */ - @SuppressWarnings("unchecked") - public ContainerMapping( - final Configuration conf, final NodeManager nodeManager, final int - cacheSizeMB, EventPublisher eventPublisher) throws IOException { - this.nodeManager = nodeManager; - this.cacheSize = cacheSizeMB; - - File metaDir = getOzoneMetaDirPath(conf); - - // Write the container name to pipeline mapping. - File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); - containerStore = - MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(containerDBPath) - .setCacheSize(this.cacheSize * OzoneConsts.MB) - .build(); - - this.lock = new ReentrantLock(); - - size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, - OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - - this.pipelineSelector = new PipelineSelector(nodeManager, - conf, eventPublisher, cacheSizeMB); - - this.containerStateManager = - new ContainerStateManager(conf, this, pipelineSelector); - LOG.trace("Container State Manager created."); - - this.eventPublisher = eventPublisher; - - long containerCreationLeaseTimeout = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, - ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - containerLeaseManager = new LeaseManager<>("ContainerCreation", - containerCreationLeaseTimeout); - containerLeaseManager.start(); - } - - /** - * {@inheritDoc} - */ - @Override - public ContainerInfo getContainer(final long containerID) throws - IOException { - ContainerInfo containerInfo; - lock.lock(); - try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - containerInfo = ContainerInfo.fromProtobuf(temp); - return containerInfo; - } finally { - lock.unlock(); - } - } - - /** - * Returns the ContainerInfo and pipeline from the containerID. If container - * has no available replicas in datanodes it returns pipeline with no - * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for - * an empty pipeline. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - @Override - public ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException { - ContainerInfo contInfo; - lock.lock(); - try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - contInfo = ContainerInfo.fromProtobuf(temp); - - Pipeline pipeline; - String leaderId = ""; - if (contInfo.isContainerOpen()) { - // If pipeline with given pipeline Id already exist return it - pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); - } else { - // For close containers create pipeline from datanodes with replicas - Set dnWithReplicas = containerStateManager - .getContainerReplicas(contInfo.containerID()); - if (!dnWithReplicas.isEmpty()) { - leaderId = dnWithReplicas.iterator().next().getUuidString(); - } - pipeline = new Pipeline(leaderId, contInfo.getState(), - ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), - PipelineID.randomId()); - dnWithReplicas.forEach(pipeline::addMember); - } - return new ContainerWithPipeline(contInfo, pipeline); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List listContainer(long startContainerID, - int count) throws IOException { - List containerList = new ArrayList<>(); - lock.lock(); - try { - if (containerStore.isEmpty()) { - throw new IOException("No container exists in current db"); - } - byte[] startKey = startContainerID <= 0 ? null : - Longs.toByteArray(startContainerID); - List> range = - containerStore.getSequentialRangeKVs(startKey, count, null); - - // Transform the values into the pipelines. - // TODO: filter by container state - for (Map.Entry entry : range) { - ContainerInfo containerInfo = - ContainerInfo.fromProtobuf( - HddsProtos.SCMContainerInfo.PARSER.parseFrom( - entry.getValue())); - Preconditions.checkNotNull(containerInfo); - containerList.add(containerInfo); - } - } finally { - lock.unlock(); - } - return containerList; - } - - /** - * Allocates a new container. - * - * @param replicationFactor - replication factor of the container. - * @param owner - The string name of the Service that owns this container. - * @return - Pipeline that makes up this container. - * @throws IOException - Exception - */ - @Override - public ContainerWithPipeline allocateContainer( - ReplicationType type, - ReplicationFactor replicationFactor, - String owner) - throws IOException { - - ContainerInfo containerInfo; - ContainerWithPipeline containerWithPipeline; - - lock.lock(); - try { - containerWithPipeline = containerStateManager.allocateContainer( - pipelineSelector, type, replicationFactor, owner); - containerInfo = containerWithPipeline.getContainerInfo(); - - byte[] containerIDBytes = Longs.toByteArray( - containerInfo.getContainerID()); - containerStore.put(containerIDBytes, containerInfo.getProtobuf() - .toByteArray()); - } finally { - lock.unlock(); - } - return containerWithPipeline; - } - - /** - * Deletes a container from SCM. - * - * @param containerID - Container ID - * @throws IOException if container doesn't exist or container store failed - * to delete the - * specified key. - */ - @Override - public void deleteContainer(long containerID) throws IOException { - lock.lock(); - try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to delete container " + containerID + ", reason : " + - "container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - containerStore.delete(dbKey); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} Used by client to update container state on SCM. - */ - @Override - public HddsProtos.LifeCycleState updateContainerState( - long containerID, HddsProtos.LifeCycleEvent event) throws - IOException { - ContainerInfo containerInfo; - lock.lock(); - try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to update container state" - + containerID - + ", reason : container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - containerInfo = - ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes)); - - Preconditions.checkNotNull(containerInfo); - switch (event) { - case CREATE: - // Acquire lease on container - Lease containerLease = - containerLeaseManager.acquire(containerInfo); - // Register callback to be executed in case of timeout - containerLease.registerCallBack(() -> { - updateContainerState(containerID, - HddsProtos.LifeCycleEvent.TIMEOUT); - return null; - }); - break; - case CREATED: - // Release the lease on container - containerLeaseManager.release(containerInfo); - break; - case FINALIZE: - // TODO: we don't need a lease manager here for closing as the - // container report will include the container state after HDFS-13008 - // If a client failed to update the container close state, DN container - // report from 3 DNs will be used to close the container eventually. - break; - case CLOSE: - break; - case UPDATE: - break; - case DELETE: - break; - case TIMEOUT: - break; - case CLEANUP: - break; - default: - throw new SCMException("Unsupported container LifeCycleEvent.", - FAILED_TO_CHANGE_CONTAINER_STATE); - } - // If the below updateContainerState call fails, we should revert the - // changes made in switch case. - // Like releasing the lease in case of BEGIN_CREATE. - ContainerInfo updatedContainer = containerStateManager - .updateContainerState(containerInfo, event); - if (!updatedContainer.isContainerOpen()) { - pipelineSelector.removeContainerFromPipeline( - containerInfo.getPipelineID(), containerID); - } - containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); - return updatedContainer.getState(); - } catch (LeaseException e) { - throw new IOException("Lease Exception.", e); - } finally { - lock.unlock(); - } - } - - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ - public void updateDeleteTransactionId(Map deleteTransactionMap) - throws IOException { - if (deleteTransactionMap == null) { - return; - } - - lock.lock(); - try { - BatchOperation batch = new BatchOperation(); - for (Map.Entry entry : deleteTransactionMap.entrySet()) { - long containerID = entry.getKey(); - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to increment number of deleted blocks for container " - + containerID + ", reason : " + "container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - ContainerInfo containerInfo = ContainerInfo.fromProtobuf( - HddsProtos.SCMContainerInfo.parseFrom(containerBytes)); - containerInfo.updateDeleteTransactionId(entry.getValue()); - batch.put(dbKey, containerInfo.getProtobuf().toByteArray()); - } - containerStore.writeBatch(batch); - containerStateManager - .updateDeleteTransactionId(deleteTransactionMap); - } finally { - lock.unlock(); - } - } - - /** - * Returns the container State Manager. - * - * @return ContainerStateManager - */ - @Override - public ContainerStateManager getStateManager() { - return containerStateManager; - } - - /** - * Return a container matching the attributes specified. - * - * @param sizeRequired - Space needed in the Container. - * @param owner - Owner of the container - A specific nameservice. - * @param type - Replication Type {StandAlone, Ratis} - * @param factor - Replication Factor {ONE, THREE} - * @param state - State of the Container-- {Open, Allocated etc.} - * @return ContainerInfo, null if there is no match found. - */ - public ContainerWithPipeline getMatchingContainerWithPipeline( - final long sizeRequired, String owner, ReplicationType type, - ReplicationFactor factor, LifeCycleState state) throws IOException { - ContainerInfo containerInfo = getStateManager() - .getMatchingContainer(sizeRequired, owner, type, factor, state); - if (containerInfo == null) { - return null; - } - Pipeline pipeline = pipelineSelector - .getPipeline(containerInfo.getPipelineID()); - return new ContainerWithPipeline(containerInfo, pipeline); - } - - /** - * Process container report from Datanode. - *

- * Processing follows a very simple logic for time being. - *

- * 1. Datanodes report the current State -- denoted by the datanodeState - *

- * 2. We are the older SCM state from the Database -- denoted by - * the knownState. - *

- * 3. We copy the usage etc. from currentState to newState and log that - * newState to the DB. This allows us SCM to bootup again and read the - * state of the world from the DB, and then reconcile the state from - * container reports, when they arrive. - * - * @param reports Container report - */ - @Override - public void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException { - List - containerInfos = reports.getReportsList(); - PendingDeleteStatusList pendingDeleteStatusList = - new PendingDeleteStatusList(datanodeDetails); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo : - containerInfos) { - // Update replica info during registration process. - if (isRegisterCall) { - try { - getStateManager().addContainerReplica(ContainerID. - valueof(contInfo.getContainerID()), datanodeDetails); - } catch (Exception ex) { - // Continue to next one after logging the error. - LOG.error("Error while adding replica for containerId {}.", - contInfo.getContainerID(), ex); - } - } - byte[] dbKey = Longs.toByteArray(contInfo.getContainerID()); - lock.lock(); - try { - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes != null) { - HddsProtos.SCMContainerInfo knownState = - HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - - if (knownState.getState() == LifeCycleState.CLOSING - && contInfo.getState() == LifeCycleState.CLOSED) { - - updateContainerState(contInfo.getContainerID(), - LifeCycleEvent.CLOSE); - - //reread the container - knownState = - HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerStore.get(dbKey)); - } - - HddsProtos.SCMContainerInfo newState = - reconcileState(contInfo, knownState, datanodeDetails); - - if (knownState.getDeleteTransactionId() > contInfo - .getDeleteTransactionId()) { - pendingDeleteStatusList - .addPendingDeleteStatus(contInfo.getDeleteTransactionId(), - knownState.getDeleteTransactionId(), - knownState.getContainerID()); - } - - // FIX ME: This can be optimized, we write twice to memory, where a - // single write would work well. - // - // We need to write this to DB again since the closed only write - // the updated State. - containerStore.put(dbKey, newState.toByteArray()); - - } else { - // Container not found in our container db. - LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeDetails, - contInfo.getContainerID()); - } - } finally { - lock.unlock(); - } - } - if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { - eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, - pendingDeleteStatusList); - } - - } - - /** - * Reconciles the state from Datanode with the state in SCM. - * - * @param datanodeState - State from the Datanode. - * @param knownState - State inside SCM. - * @param dnDetails - * @return new SCM State for this container. - */ - private HddsProtos.SCMContainerInfo reconcileState( - StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, - SCMContainerInfo knownState, DatanodeDetails dnDetails) { - HddsProtos.SCMContainerInfo.Builder builder = - HddsProtos.SCMContainerInfo.newBuilder(); - builder.setContainerID(knownState.getContainerID()) - .setPipelineID(knownState.getPipelineID()) - .setReplicationType(knownState.getReplicationType()) - .setReplicationFactor(knownState.getReplicationFactor()); - - // TODO: If current state doesn't have this DN in list of DataNodes with - // replica then add it in list of replicas. - - // If used size is greater than allocated size, we will be updating - // allocated size with used size. This update is done as a fallback - // mechanism in case SCM crashes without properly updating allocated - // size. Correct allocated value will be updated by - // ContainerStateManager during SCM shutdown. - long usedSize = datanodeState.getUsed(); - long allocated = knownState.getAllocatedBytes() > usedSize ? - knownState.getAllocatedBytes() : usedSize; - builder.setAllocatedBytes(allocated) - .setUsedBytes(usedSize) - .setNumberOfKeys(datanodeState.getKeyCount()) - .setState(knownState.getState()) - .setStateEnterTime(knownState.getStateEnterTime()) - .setContainerID(knownState.getContainerID()) - .setDeleteTransactionId(knownState.getDeleteTransactionId()); - if (knownState.getOwner() != null) { - builder.setOwner(knownState.getOwner()); - } - return builder.build(); - } - - - /** - * In Container is in closed state, if it is in closed, Deleting or Deleted - * State. - * - * @param info - ContainerInfo. - * @return true if is in open state, false otherwise - */ - private boolean shouldClose(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.OPEN; - } - - private boolean isClosed(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.CLOSED; - } - - /** - * Closes this stream and releases any system resources associated with it. - * If the stream is - * already closed then invoking this method has no effect. - *

- *

As noted in {@link AutoCloseable#close()}, cases where the close may - * fail require careful - * attention. It is strongly advised to relinquish the underlying resources - * and to internally - * mark the {@code Closeable} as closed, prior to throwing the - * {@code IOException}. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - if (containerLeaseManager != null) { - containerLeaseManager.shutdown(); - } - if (containerStateManager != null) { - flushContainerInfo(); - containerStateManager.close(); - } - if (containerStore != null) { - containerStore.close(); - } - - if (pipelineSelector != null) { - pipelineSelector.shutdown(); - } - } - - /** - * Since allocatedBytes of a container is only in memory, stored in - * containerStateManager, when closing ContainerMapping, we need to update - * this in the container store. - * - * @throws IOException on failure. - */ - @VisibleForTesting - public void flushContainerInfo() throws IOException { - List containers = containerStateManager.getAllContainers(); - List failedContainers = new ArrayList<>(); - for (ContainerInfo info : containers) { - // even if some container updated failed, others can still proceed - try { - byte[] dbKey = Longs.toByteArray(info.getContainerID()); - byte[] containerBytes = containerStore.get(dbKey); - // TODO : looks like when a container is deleted, the container is - // removed from containerStore but not containerStateManager, so it can - // return info of a deleted container. may revisit this in the future, - // for now, just skip a not-found container - if (containerBytes != null) { - containerStore.put(dbKey, info.getProtobuf().toByteArray()); - } else { - LOG.debug("Container state manager has container {} but not found " + - "in container store, a deleted container?", - info.getContainerID()); - } - } catch (IOException ioe) { - failedContainers.add(info.getContainerID()); - } - } - if (!failedContainers.isEmpty()) { - throw new IOException("Error in flushing container info from container " + - "state manager: " + failedContainers); - } - } - - @VisibleForTesting - public MetadataStore getContainerStore() { - return containerStore; - } - - public PipelineSelector getPipelineSelector() { - return pipelineSelector; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 71935f0..0f824a0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -50,21 +50,21 @@ public class ContainerReportHandler implements private final NodeManager nodeManager; - private final Mapping containerMapping; + private final ContainerManager containerManager; private ContainerStateManager containerStateManager; private ReplicationActivityStatus replicationStatus; - public ContainerReportHandler(Mapping containerMapping, + public ContainerReportHandler(ContainerManager containerManager, NodeManager nodeManager, ReplicationActivityStatus replicationActivityStatus) { - Preconditions.checkNotNull(containerMapping); + Preconditions.checkNotNull(containerManager); Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(replicationActivityStatus); - this.containerStateManager = containerMapping.getStateManager(); + this.containerStateManager = containerManager.getStateManager(); this.nodeManager = nodeManager; - this.containerMapping = containerMapping; + this.containerManager = containerManager; this.replicationStatus = replicationActivityStatus; } @@ -80,7 +80,7 @@ public class ContainerReportHandler implements try { //update state in container db and trigger close container events - containerMapping + containerManager .processContainerReports(datanodeOrigin, containerReport, false); Set containerIds = containerReport.getReportsList().stream() http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 930c098..b8e4e89 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -136,7 +136,7 @@ public class ContainerStateManager implements Closeable { */ @SuppressWarnings("unchecked") public ContainerStateManager(Configuration configuration, - Mapping containerMapping, PipelineSelector pipelineSelector) { + ContainerManager containerManager, PipelineSelector pipelineSelector) { // Initialize the container state machine. Set finalStates = new HashSet(); @@ -158,15 +158,15 @@ public class ContainerStateManager implements Closeable { lastUsedMap = new ConcurrentHashMap<>(); containerCount = new AtomicLong(0); containers = new ContainerStateMap(); - loadExistingContainers(containerMapping, pipelineSelector); + loadExistingContainers(containerManager, pipelineSelector); } - private void loadExistingContainers(Mapping containerMapping, + private void loadExistingContainers(ContainerManager containerManager, PipelineSelector pipelineSelector) { List containerList; try { - containerList = containerMapping.listContainer(0, Integer.MAX_VALUE); + containerList = containerManager.listContainer(0, Integer.MAX_VALUE); // if there are no container to load, let us return. if (containerList == null || containerList.size() == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java deleted file mode 100644 index 5ed80cb..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Mapping class contains the mapping from a name to a pipeline mapping. This is - * used by SCM when allocating new locations and when looking up a key. - */ -public interface Mapping extends Closeable { - /** - * Returns the ContainerInfo from the container ID. - * - * @param containerID - ID of container. - * @return - ContainerInfo such as creation state and the pipeline. - * @throws IOException - */ - ContainerInfo getContainer(long containerID) throws IOException; - - /** - * Returns the ContainerInfo from the container ID. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException; - - /** - * Returns containers under certain conditions. - * Search container IDs from start ID(exclusive), - * The max size of the searching range cannot exceed the - * value of count. - * - * @param startContainerID start containerID, >=0, - * start searching at the head if 0. - * @param count count must be >= 0 - * Usually the count will be replace with a very big - * value instead of being unlimited in case the db is very big. - * - * @return a list of container. - * @throws IOException - */ - List listContainer(long startContainerID, int count) - throws IOException; - - /** - * Allocates a new container for a given keyName and replication factor. - * - * @param replicationFactor - replication factor of the container. - * @param owner - * @return - ContainerWithPipeline. - * @throws IOException - */ - ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String owner) - throws IOException; - - /** - * Deletes a container from SCM. - * - * @param containerID - Container ID - * @throws IOException - */ - void deleteContainer(long containerID) throws IOException; - - /** - * Update container state. - * @param containerID - Container ID - * @param event - container life cycle event - * @return - new container state - * @throws IOException - */ - HddsProtos.LifeCycleState updateContainerState(long containerID, - HddsProtos.LifeCycleEvent event) throws IOException; - - /** - * Returns the container State Manager. - * @return ContainerStateManager - */ - ContainerStateManager getStateManager(); - - /** - * Process container report from Datanode. - * - * @param reports Container report - */ - void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException; - - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ - void updateDeleteTransactionId(Map deleteTransactionMap) - throws IOException; - - /** - * Returns the ContainerWithPipeline. - * @return NodeManager - */ - ContainerWithPipeline getMatchingContainerWithPipeline(long size, - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) throws IOException; - - PipelineSelector getPipelineSelector(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java new file mode 100644 index 0000000..df26e36 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; +import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.lease.Lease; +import org.apache.hadoop.ozone.lease.LeaseException; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; + +/** + * ContainerManager class contains the mapping from a name to a pipeline + * mapping. This is used by SCM when allocating new locations and when + * looking up a key. + */ +public class SCMContainerManager implements ContainerManager { + private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager + .class); + + private final NodeManager nodeManager; + private final long cacheSize; + private final Lock lock; + private final Charset encoding = Charset.forName("UTF-8"); + private final MetadataStore containerStore; + private final PipelineSelector pipelineSelector; + private final ContainerStateManager containerStateManager; + private final LeaseManager containerLeaseManager; + private final EventPublisher eventPublisher; + private final long size; + + /** + * Constructs a mapping class that creates mapping between container names + * and pipelines. + * + * @param nodeManager - NodeManager so that we can get the nodes that are + * healthy to place new + * containers. + * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache + * its nodes. This is + * passed to LevelDB and this memory is allocated in Native code space. + * CacheSize is specified + * in MB. + * @throws IOException on Failure. + */ + @SuppressWarnings("unchecked") + public SCMContainerManager( + final Configuration conf, final NodeManager nodeManager, final int + cacheSizeMB, EventPublisher eventPublisher) throws IOException { + this.nodeManager = nodeManager; + this.cacheSize = cacheSizeMB; + + File metaDir = getOzoneMetaDirPath(conf); + + // Write the container name to pipeline mapping. + File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); + containerStore = + MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(containerDBPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); + + this.lock = new ReentrantLock(); + + size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, + OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + + this.pipelineSelector = new PipelineSelector(nodeManager, + conf, eventPublisher, cacheSizeMB); + + this.containerStateManager = + new ContainerStateManager(conf, this, pipelineSelector); + LOG.trace("Container State Manager created."); + + this.eventPublisher = eventPublisher; + + long containerCreationLeaseTimeout = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + containerLeaseManager = new LeaseManager<>("ContainerCreation", + containerCreationLeaseTimeout); + containerLeaseManager.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public ContainerInfo getContainer(final long containerID) throws + IOException { + ContainerInfo containerInfo; + lock.lock(); + try { + byte[] containerBytes = containerStore.get( + Longs.toByteArray(containerID)); + if (containerBytes == null) { + throw new SCMException( + "Specified key does not exist. key : " + containerID, + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + + HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes); + containerInfo = ContainerInfo.fromProtobuf(temp); + return containerInfo; + } finally { + lock.unlock(); + } + } + + /** + * Returns the ContainerInfo and pipeline from the containerID. If container + * has no available replicas in datanodes it returns pipeline with no + * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for + * an empty pipeline. + * + * @param containerID - ID of container. + * @return - ContainerWithPipeline such as creation state and the pipeline. + * @throws IOException + */ + @Override + public ContainerWithPipeline getContainerWithPipeline(long containerID) + throws IOException { + ContainerInfo contInfo; + lock.lock(); + try { + byte[] containerBytes = containerStore.get( + Longs.toByteArray(containerID)); + if (containerBytes == null) { + throw new SCMException( + "Specified key does not exist. key : " + containerID, + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes); + contInfo = ContainerInfo.fromProtobuf(temp); + + Pipeline pipeline; + String leaderId = ""; + if (contInfo.isContainerOpen()) { + // If pipeline with given pipeline Id already exist return it + pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); + } else { + // For close containers create pipeline from datanodes with replicas + Set dnWithReplicas = containerStateManager + .getContainerReplicas(contInfo.containerID()); + if (!dnWithReplicas.isEmpty()) { + leaderId = dnWithReplicas.iterator().next().getUuidString(); + } + pipeline = new Pipeline(leaderId, contInfo.getState(), + ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), + PipelineID.randomId()); + dnWithReplicas.forEach(pipeline::addMember); + } + return new ContainerWithPipeline(contInfo, pipeline); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List listContainer(long startContainerID, + int count) throws IOException { + List containerList = new ArrayList<>(); + lock.lock(); + try { + if (containerStore.isEmpty()) { + throw new IOException("No container exists in current db"); + } + byte[] startKey = startContainerID <= 0 ? null : + Longs.toByteArray(startContainerID); + List> range = + containerStore.getSequentialRangeKVs(startKey, count, null); + + // Transform the values into the pipelines. + // TODO: filter by container state + for (Map.Entry entry : range) { + ContainerInfo containerInfo = + ContainerInfo.fromProtobuf( + HddsProtos.SCMContainerInfo.PARSER.parseFrom( + entry.getValue())); + Preconditions.checkNotNull(containerInfo); + containerList.add(containerInfo); + } + } finally { + lock.unlock(); + } + return containerList; + } + + /** + * Allocates a new container. + * + * @param replicationFactor - replication factor of the container. + * @param owner - The string name of the Service that owns this container. + * @return - Pipeline that makes up this container. + * @throws IOException - Exception + */ + @Override + public ContainerWithPipeline allocateContainer( + ReplicationType type, + ReplicationFactor replicationFactor, + String owner) + throws IOException { + + ContainerInfo containerInfo; + ContainerWithPipeline containerWithPipeline; + + lock.lock(); + try { + containerWithPipeline = containerStateManager.allocateContainer( + pipelineSelector, type, replicationFactor, owner); + containerInfo = containerWithPipeline.getContainerInfo(); + + byte[] containerIDBytes = Longs.toByteArray( + containerInfo.getContainerID()); + containerStore.put(containerIDBytes, containerInfo.getProtobuf() + .toByteArray()); + } finally { + lock.unlock(); + } + return containerWithPipeline; + } + + /** + * Deletes a container from SCM. + * + * @param containerID - Container ID + * @throws IOException if container doesn't exist or container store failed + * to delete the + * specified key. + */ + @Override + public void deleteContainer(long containerID) throws IOException { + lock.lock(); + try { + byte[] dbKey = Longs.toByteArray(containerID); + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes == null) { + throw new SCMException( + "Failed to delete container " + containerID + ", reason : " + + "container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + containerStore.delete(dbKey); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} Used by client to update container state on SCM. + */ + @Override + public HddsProtos.LifeCycleState updateContainerState( + long containerID, HddsProtos.LifeCycleEvent event) throws + IOException { + ContainerInfo containerInfo; + lock.lock(); + try { + byte[] dbKey = Longs.toByteArray(containerID); + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes == null) { + throw new SCMException( + "Failed to update container state" + + containerID + + ", reason : container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + containerInfo = + ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes)); + + Preconditions.checkNotNull(containerInfo); + switch (event) { + case CREATE: + // Acquire lease on container + Lease containerLease = + containerLeaseManager.acquire(containerInfo); + // Register callback to be executed in case of timeout + containerLease.registerCallBack(() -> { + updateContainerState(containerID, + HddsProtos.LifeCycleEvent.TIMEOUT); + return null; + }); + break; + case CREATED: + // Release the lease on container + containerLeaseManager.release(containerInfo); + break; + case FINALIZE: + // TODO: we don't need a lease manager here for closing as the + // container report will include the container state after HDFS-13008 + // If a client failed to update the container close state, DN container + // report from 3 DNs will be used to close the container eventually. + break; + case CLOSE: + break; + case UPDATE: + break; + case DELETE: + break; + case TIMEOUT: + break; + case CLEANUP: + break; + default: + throw new SCMException("Unsupported container LifeCycleEvent.", + FAILED_TO_CHANGE_CONTAINER_STATE); + } + // If the below updateContainerState call fails, we should revert the + // changes made in switch case. + // Like releasing the lease in case of BEGIN_CREATE. + ContainerInfo updatedContainer = containerStateManager + .updateContainerState(containerInfo, event); + if (!updatedContainer.isContainerOpen()) { + pipelineSelector.removeContainerFromPipeline( + containerInfo.getPipelineID(), containerID); + } + containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); + return updatedContainer.getState(); + } catch (LeaseException e) { + throw new IOException("Lease Exception.", e); + } finally { + lock.unlock(); + } + } + + /** + * Update deleteTransactionId according to deleteTransactionMap. + * + * @param deleteTransactionMap Maps the containerId to latest delete + * transaction id for the container. + * @throws IOException + */ + public void updateDeleteTransactionId(Map deleteTransactionMap) + throws IOException { + if (deleteTransactionMap == null) { + return; + } + + lock.lock(); + try { + BatchOperation batch = new BatchOperation(); + for (Map.Entry entry : deleteTransactionMap.entrySet()) { + long containerID = entry.getKey(); + byte[] dbKey = Longs.toByteArray(containerID); + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes == null) { + throw new SCMException( + "Failed to increment number of deleted blocks for container " + + containerID + ", reason : " + "container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + ContainerInfo containerInfo = ContainerInfo.fromProtobuf( + HddsProtos.SCMContainerInfo.parseFrom(containerBytes)); + containerInfo.updateDeleteTransactionId(entry.getValue()); + batch.put(dbKey, containerInfo.getProtobuf().toByteArray()); + } + containerStore.writeBatch(batch); + containerStateManager + .updateDeleteTransactionId(deleteTransactionMap); + } finally { + lock.unlock(); + } + } + + /** + * Returns the container State Manager. + * + * @return ContainerStateManager + */ + @Override + public ContainerStateManager getStateManager() { + return containerStateManager; + } + + /** + * Return a container matching the attributes specified. + * + * @param sizeRequired - Space needed in the Container. + * @param owner - Owner of the container - A specific nameservice. + * @param type - Replication Type {StandAlone, Ratis} + * @param factor - Replication Factor {ONE, THREE} + * @param state - State of the Container-- {Open, Allocated etc.} + * @return ContainerInfo, null if there is no match found. + */ + public ContainerWithPipeline getMatchingContainerWithPipeline( + final long sizeRequired, String owner, ReplicationType type, + ReplicationFactor factor, LifeCycleState state) throws IOException { + ContainerInfo containerInfo = getStateManager() + .getMatchingContainer(sizeRequired, owner, type, factor, state); + if (containerInfo == null) { + return null; + } + Pipeline pipeline = pipelineSelector + .getPipeline(containerInfo.getPipelineID()); + return new ContainerWithPipeline(containerInfo, pipeline); + } + + /** + * Process container report from Datanode. + *

+ * Processing follows a very simple logic for time being. + *

+ * 1. Datanodes report the current State -- denoted by the datanodeState + *

+ * 2. We are the older SCM state from the Database -- denoted by + * the knownState. + *

+ * 3. We copy the usage etc. from currentState to newState and log that + * newState to the DB. This allows us SCM to bootup again and read the + * state of the world from the DB, and then reconcile the state from + * container reports, when they arrive. + * + * @param reports Container report + */ + @Override + public void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports, boolean isRegisterCall) + throws IOException { + List + containerInfos = reports.getReportsList(); + PendingDeleteStatusList pendingDeleteStatusList = + new PendingDeleteStatusList(datanodeDetails); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo : + containerInfos) { + // Update replica info during registration process. + if (isRegisterCall) { + try { + getStateManager().addContainerReplica(ContainerID. + valueof(contInfo.getContainerID()), datanodeDetails); + } catch (Exception ex) { + // Continue to next one after logging the error. + LOG.error("Error while adding replica for containerId {}.", + contInfo.getContainerID(), ex); + } + } + byte[] dbKey = Longs.toByteArray(contInfo.getContainerID()); + lock.lock(); + try { + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes != null) { + HddsProtos.SCMContainerInfo knownState = + HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); + + if (knownState.getState() == LifeCycleState.CLOSING + && contInfo.getState() == LifeCycleState.CLOSED) { + + updateContainerState(contInfo.getContainerID(), + LifeCycleEvent.CLOSE); + + //reread the container + knownState = + HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerStore.get(dbKey)); + } + + HddsProtos.SCMContainerInfo newState = + reconcileState(contInfo, knownState, datanodeDetails); + + if (knownState.getDeleteTransactionId() > contInfo + .getDeleteTransactionId()) { + pendingDeleteStatusList + .addPendingDeleteStatus(contInfo.getDeleteTransactionId(), + knownState.getDeleteTransactionId(), + knownState.getContainerID()); + } + + // FIX ME: This can be optimized, we write twice to memory, where a + // single write would work well. + // + // We need to write this to DB again since the closed only write + // the updated State. + containerStore.put(dbKey, newState.toByteArray()); + + } else { + // Container not found in our container db. + LOG.error("Error while processing container report from datanode :" + + " {}, for container: {}, reason: container doesn't exist in" + + "container database.", datanodeDetails, + contInfo.getContainerID()); + } + } finally { + lock.unlock(); + } + } + if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { + eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, + pendingDeleteStatusList); + } + + } + + /** + * Reconciles the state from Datanode with the state in SCM. + * + * @param datanodeState - State from the Datanode. + * @param knownState - State inside SCM. + * @param dnDetails + * @return new SCM State for this container. + */ + private HddsProtos.SCMContainerInfo reconcileState( + StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, + SCMContainerInfo knownState, DatanodeDetails dnDetails) { + HddsProtos.SCMContainerInfo.Builder builder = + HddsProtos.SCMContainerInfo.newBuilder(); + builder.setContainerID(knownState.getContainerID()) + .setPipelineID(knownState.getPipelineID()) + .setReplicationType(knownState.getReplicationType()) + .setReplicationFactor(knownState.getReplicationFactor()); + + // TODO: If current state doesn't have this DN in list of DataNodes with + // replica then add it in list of replicas. + + // If used size is greater than allocated size, we will be updating + // allocated size with used size. This update is done as a fallback + // mechanism in case SCM crashes without properly updating allocated + // size. Correct allocated value will be updated by + // ContainerStateManager during SCM shutdown. + long usedSize = datanodeState.getUsed(); + long allocated = knownState.getAllocatedBytes() > usedSize ? + knownState.getAllocatedBytes() : usedSize; + builder.setAllocatedBytes(allocated) + .setUsedBytes(usedSize) + .setNumberOfKeys(datanodeState.getKeyCount()) + .setState(knownState.getState()) + .setStateEnterTime(knownState.getStateEnterTime()) + .setContainerID(knownState.getContainerID()) + .setDeleteTransactionId(knownState.getDeleteTransactionId()); + if (knownState.getOwner() != null) { + builder.setOwner(knownState.getOwner()); + } + return builder.build(); + } + + + /** + * In Container is in closed state, if it is in closed, Deleting or Deleted + * State. + * + * @param info - ContainerInfo. + * @return true if is in open state, false otherwise + */ + private boolean shouldClose(ContainerInfo info) { + return info.getState() == HddsProtos.LifeCycleState.OPEN; + } + + private boolean isClosed(ContainerInfo info) { + return info.getState() == HddsProtos.LifeCycleState.CLOSED; + } + + /** + * Closes this stream and releases any system resources associated with it. + * If the stream is + * already closed then invoking this method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful + * attention. It is strongly advised to relinquish the underlying resources + * and to internally + * mark the {@code Closeable} as closed, prior to throwing the + * {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (containerLeaseManager != null) { + containerLeaseManager.shutdown(); + } + if (containerStateManager != null) { + flushContainerInfo(); + containerStateManager.close(); + } + if (containerStore != null) { + containerStore.close(); + } + + if (pipelineSelector != null) { + pipelineSelector.shutdown(); + } + } + + /** + * Since allocatedBytes of a container is only in memory, stored in + * containerStateManager, when closing SCMContainerManager, we need to update + * this in the container store. + * + * @throws IOException on failure. + */ + @VisibleForTesting + public void flushContainerInfo() throws IOException { + List containers = containerStateManager.getAllContainers(); + List failedContainers = new ArrayList<>(); + for (ContainerInfo info : containers) { + // even if some container updated failed, others can still proceed + try { + byte[] dbKey = Longs.toByteArray(info.getContainerID()); + byte[] containerBytes = containerStore.get(dbKey); + // TODO : looks like when a container is deleted, the container is + // removed from containerStore but not containerStateManager, so it can + // return info of a deleted container. may revisit this in the future, + // for now, just skip a not-found container + if (containerBytes != null) { + containerStore.put(dbKey, info.getProtobuf().toByteArray()); + } else { + LOG.debug("Container state manager has container {} but not found " + + "in container store, a deleted container?", + info.getContainerID()); + } + } catch (IOException ioe) { + failedContainers.add(info.getContainerID()); + } + } + if (!failedContainers.isEmpty()) { + throw new IOException("Error in flushing container info from container " + + "state manager: " + failedContainers); + } + } + + @VisibleForTesting + public MetadataStore getContainerStore() { + return containerStore; + } + + public PipelineSelector getPipelineSelector() { + return pipelineSelector; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org