From common-commits-return-86253-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Jul 31 11:25:59 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EBB4B180718 for ; Tue, 31 Jul 2018 11:25:57 +0200 (CEST) Received: (qmail 77436 invoked by uid 500); 31 Jul 2018 09:25:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 76249 invoked by uid 99); 31 Jul 2018 09:25:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jul 2018 09:25:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B2D6E10CD; Tue, 31 Jul 2018 09:25:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ehiggs@apache.org To: common-commits@hadoop.apache.org Date: Tue, 31 Jul 2018 09:26:21 -0000 Message-Id: <412034df476c47769f3473e823552d1f@git.apache.org> In-Reply-To: <4e533beaec11404caf0635c07bf3761a@git.apache.org> References: <4e533beaec11404caf0635c07bf3761a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/50] hadoop git commit: HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain. HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/feb795b5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/feb795b5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/feb795b5 Branch: refs/heads/HDFS-12090 Commit: feb795b58d2a3c20bdbddea1638a83f6637d3fc9 Parents: 6b038f8 Author: Mukul Kumar Singh Authored: Sun Jul 29 01:02:24 2018 +0530 Committer: Mukul Kumar Singh Committed: Sun Jul 29 01:02:24 2018 +0530 ---------------------------------------------------------------------- .../DeleteBlocksCommandHandler.java | 12 +- .../StorageContainerDatanodeProtocol.proto | 4 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 2 +- .../block/DatanodeDeletedBlockTransactions.java | 47 ++-- .../hadoop/hdds/scm/block/DeletedBlockLog.java | 23 +- .../hdds/scm/block/DeletedBlockLogImpl.java | 123 ++++++---- .../scm/server/SCMDatanodeProtocolServer.java | 19 +- .../hdds/scm/block/TestDeletedBlockLog.java | 232 ++++++++++--------- 8 files changed, 256 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 9640f93..b0d4cbc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -113,8 +113,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler { DeleteBlockTransactionResult.Builder txResultBuilder = DeleteBlockTransactionResult.newBuilder(); txResultBuilder.setTxID(entry.getTxID()); + long containerId = entry.getContainerID(); try { - long containerId = entry.getContainerID(); Container cont = containerSet.getContainer(containerId); if (cont == null) { throw new StorageContainerException("Unable to find the container " @@ -126,7 +126,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler { KeyValueContainerData containerData = (KeyValueContainerData) cont.getContainerData(); deleteKeyValueContainerBlocks(containerData, entry); - txResultBuilder.setSuccess(true); + txResultBuilder.setContainerID(containerId) + .setSuccess(true); break; default: LOG.error( @@ -136,9 +137,12 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } catch (IOException e) { LOG.warn("Failed to delete blocks for container={}, TXID={}", entry.getContainerID(), entry.getTxID(), e); - txResultBuilder.setSuccess(false); + txResultBuilder.setContainerID(containerId) + .setSuccess(false); } - resultBuilder.addResults(txResultBuilder.build()); + resultBuilder.addResults(txResultBuilder.build()) + .setDnId(context.getParent().getDatanodeDetails() + .getUuid().toString()); }); ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index d89567b..0c52efb 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -229,9 +229,11 @@ message DeletedBlocksTransaction { message ContainerBlocksDeletionACKProto { message DeleteBlockTransactionResult { required int64 txID = 1; - required bool success = 2; + required int64 containerID = 2; + required bool success = 3; } repeated DeleteBlockTransactionResult results = 1; + required string dnId = 2; } // SendACK response returned by datanode to SCM, currently empty. http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 6825ca4..8e1c2cc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -112,7 +112,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); // SCM block deleting transaction log and deleting service. - deletedBlockLog = new DeletedBlockLogImpl(conf); + deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager); long svcInterval = conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index d71e7b0..e33a700 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -53,7 +53,8 @@ public class DatanodeDeletedBlockTransactions { this.nodeNum = nodeNum; } - public void addTransaction(DeletedBlocksTransaction tx) throws IOException { + public void addTransaction(DeletedBlocksTransaction tx, + Set dnsWithTransactionCommitted) throws IOException { Pipeline pipeline = null; try { pipeline = mappingService.getContainerWithPipeline(tx.getContainerID()) @@ -71,29 +72,37 @@ public class DatanodeDeletedBlockTransactions { for (DatanodeDetails dd : pipeline.getMachines()) { UUID dnID = dd.getUuid(); - if (transactions.containsKey(dnID)) { - List txs = transactions.get(dnID); - if (txs != null && txs.size() < maximumAllowedTXNum) { - boolean hasContained = false; - for (DeletedBlocksTransaction t : txs) { - if (t.getContainerID() == tx.getContainerID()) { - hasContained = true; - break; - } - } + if (dnsWithTransactionCommitted == null || + !dnsWithTransactionCommitted.contains(dnID)) { + // Transaction need not be sent to dns which have already committed it + addTransactionToDN(dnID, tx); + } + } + } - if (!hasContained) { - txs.add(tx); - currentTXNum++; + private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { + if (transactions.containsKey(dnID)) { + List txs = transactions.get(dnID); + if (txs != null && txs.size() < maximumAllowedTXNum) { + boolean hasContained = false; + for (DeletedBlocksTransaction t : txs) { + if (t.getContainerID() == tx.getContainerID()) { + hasContained = true; + break; } } - } else { - currentTXNum++; - transactions.put(dnID, tx); + + if (!hasContained) { + txs.add(tx); + currentTXNum++; + } } - SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID, - tx.getTxID()); + } else { + currentTXNum++; + transactions.put(dnID, tx); } + SCMBlockDeletingService.LOG + .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID()); } Set getDatanodeIDs() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index 28103be..2bb5686 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hdds.scm.block; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.UUID; /** * The DeletedBlockLog is a persisted log in SCM to keep tracking @@ -34,18 +38,6 @@ import java.util.Map; public interface DeletedBlockLog extends Closeable { /** - * A limit size list of transactions. Note count is the max number - * of TXs to return, we might not be able to always return this - * number. and the processCount of those transactions - * should be [0, MAX_RETRY). - * - * @param count - number of transactions. - * @return a list of BlockDeletionTransaction. - */ - List getTransactions(int count) - throws IOException; - - /** * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. * Once DatanodeDeletedBlockTransactions is full, the scan behavior will * stop. @@ -81,10 +73,11 @@ public interface DeletedBlockLog extends Closeable { * Commits a transaction means to delete all footprints of a transaction * from the log. This method doesn't guarantee all transactions can be * successfully deleted, it tolerate failures and tries best efforts to. - * - * @param txIDs - transaction IDs. + * @param transactionResults - delete block transaction results. + * @param dnID - ID of datanode which acknowledges the delete block command. */ - void commitTransactions(List txIDs) throws IOException; + void commitTransactions(List transactionResults, + UUID dnID); /** * Creates a block deletion transaction and adds that into the log. http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 48fa2eb..752c9c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -21,27 +21,36 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; @@ -74,12 +83,15 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { private final int maxRetry; private final MetadataStore deletedStore; + private final Mapping containerManager; private final Lock lock; // The latest id of deleted blocks in the db. private long lastTxID; - private long lastReadTxID; + // Maps txId to set of DNs which are successful in committing the transaction + private Map> transactionToDNsCommitMap; - public DeletedBlockLogImpl(Configuration conf) throws IOException { + public DeletedBlockLogImpl(Configuration conf, Mapping containerManager) + throws IOException { maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); @@ -95,11 +107,17 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { .setDbFile(deletedLogDbPath) .setCacheSize(cacheSize * OzoneConsts.MB) .build(); + this.containerManager = containerManager; this.lock = new ReentrantLock(); // start from the head of deleted store. - lastReadTxID = 0; lastTxID = findLatestTxIDInStore(); + + // transactionToDNsCommitMap is updated only when + // transaction is added to the log and when it is removed. + + // maps transaction to dns which have committed it. + transactionToDNsCommitMap = new ConcurrentHashMap<>(); } @VisibleForTesting @@ -124,39 +142,6 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } @Override - public List getTransactions( - int count) throws IOException { - List result = new ArrayList<>(); - MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey) - -> Longs.fromByteArray(currentKey) > lastReadTxID; - MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey) - -> !Arrays.equals(LATEST_TXID, currentKey); - lock.lock(); - try { - deletedStore.iterate(null, (key, value) -> { - if (getNextTxID.filterKey(null, key, null) && - avoidInvalidTxid.filterKey(null, key, null)) { - DeletedBlocksTransaction block = DeletedBlocksTransaction - .parseFrom(value); - if (block.getCount() > -1 && block.getCount() <= maxRetry) { - result.add(block); - } - } - return result.size() < count; - }); - // Scan the metadata from the beginning. - if (result.size() < count || result.size() < 1) { - lastReadTxID = 0; - } else { - lastReadTxID = result.get(result.size() - 1).getTxID(); - } - } finally { - lock.unlock(); - } - return result; - } - - @Override public List getFailedTransactions() throws IOException { lock.lock(); @@ -235,18 +220,50 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { /** * {@inheritDoc} * - * @param txIDs - transaction IDs. + * @param transactionResults - transaction IDs. + * @param dnID - Id of Datanode which has acknowledged a delete block command. * @throws IOException */ @Override - public void commitTransactions(List txIDs) throws IOException { + public void commitTransactions( + List transactionResults, UUID dnID) { lock.lock(); try { - for (Long txID : txIDs) { + Set dnsWithCommittedTxn; + for (DeleteBlockTransactionResult transactionResult : transactionResults) { + if (isTransactionFailed(transactionResult)) { + continue; + } try { - deletedStore.delete(Longs.toByteArray(txID)); - } catch (IOException ex) { - LOG.warn("Cannot commit txID " + txID, ex); + long txID = transactionResult.getTxID(); + // set of dns which have successfully committed transaction txId. + dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); + Long containerId = transactionResult.getContainerID(); + if (dnsWithCommittedTxn == null || containerId == null) { + LOG.warn("Transaction txId={} commit by dnId={} failed." + + " Corresponding entry not found.", txID, dnID); + return; + } + + dnsWithCommittedTxn.add(dnID); + Collection containerDnsDetails = + containerManager.getContainerWithPipeline(containerId) + .getPipeline().getDatanodes().values(); + // The delete entry can be safely removed from the log if all the + // corresponding nodes commit the txn. + if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) { + List containerDns = containerDnsDetails.stream() + .map(dnDetails -> dnDetails.getUuid()) + .collect(Collectors.toList()); + if (dnsWithCommittedTxn.containsAll(containerDns)) { + transactionToDNsCommitMap.remove(txID); + LOG.debug("Purging txId={} from block deletion log", txID); + deletedStore.delete(Longs.toByteArray(txID)); + } + } + } catch (IOException e) { + LOG.warn("Could not commit delete block transaction: " + + transactionResult.getTxID(), e); } } } finally { @@ -254,6 +271,20 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } } + private boolean isTransactionFailed(DeleteBlockTransactionResult result) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Got block deletion ACK from datanode, TXIDs={}, " + "success={}", + result.getTxID(), result.getSuccess()); + } + if (!result.getSuccess()) { + LOG.warn("Got failed ACK for TXID={}, prepare to resend the " + + "TX in next interval", result.getTxID()); + return true; + } + return false; + } + /** * {@inheritDoc} * @@ -355,7 +386,9 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { .parseFrom(value); if (block.getCount() > -1 && block.getCount() <= maxRetry) { - transactions.addTransaction(block); + Set dnsWithTransactionCommitted = transactionToDNsCommitMap + .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>()); + transactions.addTransaction(block, dnsWithTransactionCommitted); } return !transactions.isFull(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index aee64b9..0d34787 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -91,9 +91,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; @@ -230,21 +230,8 @@ public class SCMDatanodeProtocolServer implements ContainerBlocksDeletionACKProto acks) throws IOException { if (acks.getResultsCount() > 0) { List resultList = acks.getResultsList(); - for (DeleteBlockTransactionResult result : resultList) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got block deletion ACK from datanode, TXIDs={}, " - + "success={}", result.getTxID(), result.getSuccess()); - } - if (result.getSuccess()) { - LOG.debug("Purging TXID={} from block deletion log", - result.getTxID()); - scm.getScmBlockManager().getDeletedBlockLog() - .commitTransactions(Collections.singletonList(result.getTxID())); - } else { - LOG.warn("Got failed ACK for TXID={}, prepare to resend the " - + "TX in next interval", result.getTxID()); - } - } + scm.getScmBlockManager().getDeletedBlockLog() + .commitTransactions(resultList, UUID.fromString(acks.getDnId())); } return ContainerBlocksDeletionACKResponseProto.newBuilder() .getDefaultInstanceForType(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 9255ec7..e86717b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -32,6 +32,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataStore; @@ -45,6 +48,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -56,7 +60,8 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; -import static org.mockito.Mockito.mock; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.when; /** * Tests for DeletedBlockLog. @@ -66,6 +71,8 @@ public class TestDeletedBlockLog { private static DeletedBlockLogImpl deletedBlockLog; private OzoneConfiguration conf; private File testDir; + private Mapping containerManager; + private List dnList; @Before public void setup() throws Exception { @@ -74,7 +81,36 @@ public class TestDeletedBlockLog { conf = new OzoneConfiguration(); conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); - deletedBlockLog = new DeletedBlockLogImpl(conf); + containerManager = Mockito.mock(ContainerMapping.class); + deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager); + dnList = new ArrayList<>(3); + setupContainerManager(); + } + + private void setupContainerManager() throws IOException { + dnList.add( + DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString()) + .build()); + dnList.add( + DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString()) + .build()); + dnList.add( + DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString()) + .build()); + + ContainerInfo containerInfo = + new ContainerInfo.Builder().setContainerID(1).build(); + Pipeline pipeline = + new Pipeline(null, LifeCycleState.CLOSED, ReplicationType.RATIS, + ReplicationFactor.THREE, null); + pipeline.addMember(dnList.get(0)); + pipeline.addMember(dnList.get(1)); + pipeline.addMember(dnList.get(2)); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(containerInfo, pipeline); + when(containerManager.getContainerWithPipeline(anyLong())) + .thenReturn(containerWithPipeline); + when(containerManager.getContainer(anyLong())).thenReturn(containerInfo); } @After @@ -101,45 +137,50 @@ public class TestDeletedBlockLog { return blockMap; } - @Test - public void testGetTransactions() throws Exception { - List blocks = - deletedBlockLog.getTransactions(30); - Assert.assertEquals(0, blocks.size()); - - // Creates 40 TX in the log. - for (Map.Entry> entry : generateData(40).entrySet()){ - deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + private void commitTransactions( + List transactionResults, + DatanodeDetails... dns) { + for (DatanodeDetails dnDetails : dns) { + deletedBlockLog + .commitTransactions(transactionResults, dnDetails.getUuid()); } + } - // Get first 30 TXs. - blocks = deletedBlockLog.getTransactions(30); - Assert.assertEquals(30, blocks.size()); - for (int i = 0; i < 30; i++) { - Assert.assertEquals(i + 1, blocks.get(i).getTxID()); - } + private void commitTransactions( + List transactionResults) { + commitTransactions(transactionResults, + dnList.toArray(new DatanodeDetails[3])); + } - // Get another 30 TXs. - // The log only 10 left, so this time it will only return 10 TXs. - blocks = deletedBlockLog.getTransactions(30); - Assert.assertEquals(10, blocks.size()); - for (int i = 30; i < 40; i++) { - Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID()); - } + private void commitTransactions( + Collection deletedBlocksTransactions, + DatanodeDetails... dns) { + commitTransactions(deletedBlocksTransactions.stream() + .map(this::createDeleteBlockTransactionResult) + .collect(Collectors.toList()), dns); + } - // Get another 50 TXs. - // By now the position should have moved to the beginning, - // this call will return all 40 TXs. - blocks = deletedBlockLog.getTransactions(50); - Assert.assertEquals(40, blocks.size()); - for (int i = 0; i < 40; i++) { - Assert.assertEquals(i + 1, blocks.get(i).getTxID()); - } - List txIDs = new ArrayList<>(); - for (DeletedBlocksTransaction block : blocks) { - txIDs.add(block.getTxID()); - } - deletedBlockLog.commitTransactions(txIDs); + private void commitTransactions( + Collection deletedBlocksTransactions) { + commitTransactions(deletedBlocksTransactions.stream() + .map(this::createDeleteBlockTransactionResult) + .collect(Collectors.toList())); + } + + private DeleteBlockTransactionResult createDeleteBlockTransactionResult( + DeletedBlocksTransaction transaction) { + return DeleteBlockTransactionResult.newBuilder() + .setContainerID(transaction.getContainerID()).setSuccess(true) + .setTxID(transaction.getTxID()).build(); + } + + private List getTransactions( + int maximumAllowedTXNum) throws IOException { + DatanodeDeletedBlockTransactions transactions = + new DatanodeDeletedBlockTransactions(containerManager, + maximumAllowedTXNum, 3); + deletedBlockLog.getTransactions(transactions); + return transactions.getDatanodeTransactions(dnList.get(0).getUuid()); } @Test @@ -153,7 +194,7 @@ public class TestDeletedBlockLog { // This will return all TXs, total num 30. List blocks = - deletedBlockLog.getTransactions(40); + getTransactions(40); List txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID) .collect(Collectors.toList()); @@ -164,13 +205,13 @@ public class TestDeletedBlockLog { // Increment another time so it exceed the maxRetry. // On this call, count will be set to -1 which means TX eventually fails. deletedBlockLog.incrementCount(txIDs); - blocks = deletedBlockLog.getTransactions(40); + blocks = getTransactions(40); for (DeletedBlocksTransaction block : blocks) { Assert.assertEquals(-1, block.getCount()); } // If all TXs are failed, getTransactions call will always return nothing. - blocks = deletedBlockLog.getTransactions(40); + blocks = getTransactions(40); Assert.assertEquals(blocks.size(), 0); } @@ -180,16 +221,26 @@ public class TestDeletedBlockLog { deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); } List blocks = - deletedBlockLog.getTransactions(20); - List txIDs = new ArrayList<>(); - for (DeletedBlocksTransaction block : blocks) { - txIDs.add(block.getTxID()); - } - // Add an invalid txID. - txIDs.add(70L); - deletedBlockLog.commitTransactions(txIDs); - blocks = deletedBlockLog.getTransactions(50); + getTransactions(20); + // Add an invalid txn. + blocks.add( + DeletedBlocksTransaction.newBuilder().setContainerID(1).setTxID(70) + .setCount(0).addLocalID(0).build()); + commitTransactions(blocks); + blocks.remove(blocks.size() - 1); + + blocks = getTransactions(50); + Assert.assertEquals(30, blocks.size()); + commitTransactions(blocks, dnList.get(1), dnList.get(2), + DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString()) + .build()); + + blocks = getTransactions(50); Assert.assertEquals(30, blocks.size()); + commitTransactions(blocks, dnList.get(0)); + + blocks = getTransactions(50); + Assert.assertEquals(0, blocks.size()); } @Test @@ -213,20 +264,16 @@ public class TestDeletedBlockLog { } added += 10; } else if (state == 1) { - blocks = deletedBlockLog.getTransactions(20); + blocks = getTransactions(20); txIDs = new ArrayList<>(); for (DeletedBlocksTransaction block : blocks) { txIDs.add(block.getTxID()); } deletedBlockLog.incrementCount(txIDs); } else if (state == 2) { - txIDs = new ArrayList<>(); - for (DeletedBlocksTransaction block : blocks) { - txIDs.add(block.getTxID()); - } + commitTransactions(blocks); + committed += blocks.size(); blocks = new ArrayList<>(); - committed += txIDs.size(); - deletedBlockLog.commitTransactions(txIDs); } else { // verify the number of added and committed. List> result = @@ -234,6 +281,8 @@ public class TestDeletedBlockLog { Assert.assertEquals(added, result.size() + committed); } } + blocks = getTransactions(1000); + commitTransactions(blocks); } @Test @@ -244,16 +293,13 @@ public class TestDeletedBlockLog { // close db and reopen it again to make sure // transactions are stored persistently. deletedBlockLog.close(); - deletedBlockLog = new DeletedBlockLogImpl(conf); + deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager); List blocks = - deletedBlockLog.getTransactions(10); - List txIDs = new ArrayList<>(); - for (DeletedBlocksTransaction block : blocks) { - txIDs.add(block.getTxID()); - } - deletedBlockLog.commitTransactions(txIDs); - blocks = deletedBlockLog.getTransactions(10); - Assert.assertEquals(10, blocks.size()); + getTransactions(10); + commitTransactions(blocks); + blocks = getTransactions(100); + Assert.assertEquals(40, blocks.size()); + commitTransactions(blocks); } @Test @@ -262,32 +308,11 @@ public class TestDeletedBlockLog { int maximumAllowedTXNum = 5; List blocks = null; List containerIDs = new LinkedList<>(); + DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1); int count = 0; long containerID = 0L; - DatanodeDetails.Port containerPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.STANDALONE, 0); - DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.RATIS, 0); - DatanodeDetails.Port restPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.REST, 0); - DatanodeDetails dnId1 = DatanodeDetails.newBuilder() - .setUuid(UUID.randomUUID().toString()) - .setIpAddress("127.0.0.1") - .setHostName("localhost") - .addPort(containerPort) - .addPort(ratisPort) - .addPort(restPort) - .build(); - DatanodeDetails dnId2 = DatanodeDetails.newBuilder() - .setUuid(UUID.randomUUID().toString()) - .setIpAddress("127.0.0.1") - .setHostName("localhost") - .addPort(containerPort) - .addPort(ratisPort) - .addPort(restPort) - .build(); - Mapping mappingService = mock(ContainerMapping.class); + // Creates {TXNum} TX in the log. for (Map.Entry> entry : generateData(txNum) .entrySet()) { @@ -298,29 +323,25 @@ public class TestDeletedBlockLog { // make TX[1-6] for datanode1; TX[7-10] for datanode2 if (count <= (maximumAllowedTXNum + 1)) { - mockContainerInfo(mappingService, containerID, dnId1); + mockContainerInfo(containerID, dnId1); } else { - mockContainerInfo(mappingService, containerID, dnId2); + mockContainerInfo(containerID, dnId2); } } DatanodeDeletedBlockTransactions transactions = - new DatanodeDeletedBlockTransactions(mappingService, + new DatanodeDeletedBlockTransactions(containerManager, maximumAllowedTXNum, 2); deletedBlockLog.getTransactions(transactions); - List txIDs = new LinkedList<>(); for (UUID id : transactions.getDatanodeIDs()) { List txs = transactions .getDatanodeTransactions(id); - for (DeletedBlocksTransaction tx : txs) { - txIDs.add(tx.getTxID()); - } + // delete TX ID + commitTransactions(txs); } - // delete TX ID - deletedBlockLog.commitTransactions(txIDs); - blocks = deletedBlockLog.getTransactions(txNum); + blocks = getTransactions(txNum); // There should be one block remained since dnID1 reaches // the maximum value (5). Assert.assertEquals(1, blocks.size()); @@ -337,7 +358,8 @@ public class TestDeletedBlockLog { builder.setTxID(11); builder.setContainerID(containerID); builder.setCount(0); - transactions.addTransaction(builder.build()); + transactions.addTransaction(builder.build(), + null); // The number of TX in dnID2 should not be changed. Assert.assertEquals(size, @@ -349,14 +371,14 @@ public class TestDeletedBlockLog { builder.setTxID(12); builder.setContainerID(containerID); builder.setCount(0); - mockContainerInfo(mappingService, containerID, dnId2); - transactions.addTransaction(builder.build()); + mockContainerInfo(containerID, dnId2); + transactions.addTransaction(builder.build(), + null); // Since all node are full, then transactions is full. Assert.assertTrue(transactions.isFull()); } - private void mockContainerInfo(Mapping mappingService, long containerID, - DatanodeDetails dd) throws IOException { + private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException { Pipeline pipeline = new Pipeline("fake", LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake"); @@ -370,9 +392,9 @@ public class TestDeletedBlockLog { ContainerInfo containerInfo = builder.build(); ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline( containerInfo, pipeline); - Mockito.doReturn(containerInfo).when(mappingService) + Mockito.doReturn(containerInfo).when(containerManager) .getContainer(containerID); - Mockito.doReturn(containerWithPipeline).when(mappingService) + Mockito.doReturn(containerWithPipeline).when(containerManager) .getContainerWithPipeline(containerID); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org