Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8DACF182EE for ; Tue, 2 Feb 2016 19:28:55 +0000 (UTC) Received: (qmail 9646 invoked by uid 500); 2 Feb 2016 19:28:55 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 9435 invoked by uid 500); 2 Feb 2016 19:28:55 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 9237 invoked by uid 99); 2 Feb 2016 19:28:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Feb 2016 19:28:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DF3EDDFD7D; Tue, 2 Feb 2016 19:28:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cmccabe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 02 Feb 2016 19:28:55 -0000 Message-Id: <67f1ef8531524a3f8a5851c747411f7d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe) HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd9ebf6e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd9ebf6e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd9ebf6e Branch: refs/heads/trunk Commit: dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a Parents: 2da03b4 Author: Colin Patrick Mccabe Authored: Tue Feb 2 11:23:00 2016 -0800 Committer: Colin Patrick Mccabe Committed: Tue Feb 2 11:23:00 2016 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 12 + .../DatanodeProtocolClientSideTranslatorPB.java | 5 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 5 +- .../hdfs/server/blockmanagement/BlockInfo.java | 192 +-- .../blockmanagement/BlockInfoContiguous.java | 29 +- .../blockmanagement/BlockInfoStriped.java | 30 +- .../server/blockmanagement/BlockManager.java | 458 +++++-- .../hdfs/server/blockmanagement/BlocksMap.java | 66 +- .../blockmanagement/DatanodeStorageInfo.java | 123 +- .../hdfs/server/datanode/BPServiceActor.java | 4 +- .../datanode/fsdataset/impl/ReplicaMap.java | 71 +- .../server/protocol/BlockReportContext.java | 10 +- .../hdfs/server/protocol/DatanodeProtocol.java | 1 - .../apache/hadoop/hdfs/util/FoldedTreeSet.java | 1285 ++++++++++++++++++ .../src/main/proto/DatanodeProtocol.proto | 3 + .../hdfs/protocol/TestBlockListAsLongs.java | 4 +- .../server/blockmanagement/TestBlockInfo.java | 88 -- .../blockmanagement/TestBlockManager.java | 71 +- .../server/datanode/SimulatedFSDataset.java | 5 +- .../TestBlockHasMultipleReplicasOnSameDN.java | 20 +- .../datanode/TestDataNodeHotSwapVolumes.java | 1 + .../datanode/TestDataNodeVolumeFailure.java | 4 +- ...TestDnRespectsBlockReportSplitThreshold.java | 4 +- .../TestNNHandlesBlockReportPerStorage.java | 3 +- .../TestNNHandlesCombinedBlockReport.java | 2 +- .../server/datanode/TestTriggerBlockReport.java | 1 + .../server/namenode/NNThroughputBenchmark.java | 4 +- .../server/namenode/TestAddStripedBlocks.java | 4 +- .../hdfs/server/namenode/TestDeadDatanode.java | 2 +- .../hadoop/hdfs/util/FoldedTreeSetTest.java | 644 +++++++++ 31 files changed, 2565 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2eac881..38cb3df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe) + HDFS-9260. Improve the performance and GC friendliness of NameNode startup + and full block reports (Staffan Friberg via cmccabe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5217740..76915cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit"; public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY + = "dfs.namenode.storageinfo.defragment.interval.ms"; + public static final int + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY + = "dfs.namenode.storageinfo.defragment.timeout.ms"; + public static final int + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY + = "dfs.namenode.storageinfo.defragment.ratio"; + public static final double + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75; public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter"; /* Phrased as below to avoid javac inlining as a constant, to match the behavior when this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 81c23e1..79113dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -174,12 +174,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, StorageBlockReport[] reports, BlockReportContext context) + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); - + boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 4b6baf2..e70cdf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -824,8 +824,8 @@ public class PBHelper { public static BlockReportContext convert(BlockReportContextProto proto) { - return new BlockReportContext(proto.getTotalRpcs(), - proto.getCurRpc(), proto.getId(), proto.getLeaseId()); + return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(), + proto.getId(), proto.getLeaseId(), proto.getSorted()); } public static BlockReportContextProto convert(BlockReportContext context) { @@ -834,6 +834,7 @@ public class PBHelper { setCurRpc(context.getCurRpc()). setId(context.getReportId()). setLeaseId(context.getLeaseId()). + setSorted(context.isSorted()). build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index e9fa123..5da2140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block /** For implementing {@link LightWeightGSet.LinkedElement} interface. */ private LightWeightGSet.LinkedElement nextLinkedElement; - /** - * This array contains triplets of references. For each i-th storage, the - * block belongs to triplets[3*i] is the reference to the - * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are - * references to the previous and the next blocks, respectively, in the list - * of blocks belonging to this storage. - * - * Using previous and next in Object triplets is done instead of a - * {@link LinkedList} list to efficiently use memory. With LinkedList the cost - * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 - * bytes using the triplets. - */ - protected Object[] triplets; + + // Storages this block is replicated on + protected DatanodeStorageInfo[] storages; private BlockUnderConstructionFeature uc; @@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block * in the block group */ public BlockInfo(short size) { - this.triplets = new Object[3 * size]; + this.storages = new DatanodeStorageInfo[size]; this.bcId = INVALID_INODE_ID; this.replication = isStriped() ? 0 : size; } public BlockInfo(Block blk, short size) { super(blk); - this.triplets = new Object[3*size]; + this.storages = new DatanodeStorageInfo[size]; this.bcId = INVALID_INODE_ID; this.replication = isStriped() ? 0 : size; } @@ -109,79 +100,52 @@ public abstract class BlockInfo extends Block return bcId == INVALID_INODE_ID; } - public DatanodeDescriptor getDatanode(int index) { - DatanodeStorageInfo storage = getStorageInfo(index); - return storage == null ? null : storage.getDatanodeDescriptor(); - } + public Iterator getStorageInfos() { + return new Iterator() { - DatanodeStorageInfo getStorageInfo(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeStorageInfo)triplets[index*3]; - } + private int index = 0; - BlockInfo getPrevious(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo)triplets[index*3+1]; - assert info == null || - info.getClass().getName().startsWith(BlockInfo.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; - } + @Override + public boolean hasNext() { + while (index < storages.length && storages[index] == null) { + index++; + } + return index < storages.length; + } - BlockInfo getNext(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo)triplets[index*3+2]; - assert info == null || info.getClass().getName().startsWith( - BlockInfo.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; + @Override + public DatanodeStorageInfo next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return storages[index++]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Sorry. can't remove."); + } + }; } - void setStorageInfo(int index, DatanodeStorageInfo storage) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - triplets[index*3] = storage; + public DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); } - /** - * Return the previous block on the block list for the datanode at - * position index. Set the previous block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to previous on the list of blocks - * @return current previous block on the list of blocks - */ - BlockInfo setPrevious(int index, BlockInfo to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo) triplets[index*3+1]; - triplets[index*3+1] = to; - return info; + DatanodeStorageInfo getStorageInfo(int index) { + assert this.storages != null : "BlockInfo is not initialized"; + return storages[index]; } - /** - * Return the next block on the block list for the datanode at - * position index. Set the next block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to next on the list of blocks - * @return current next block on the list of blocks - */ - BlockInfo setNext(int index, BlockInfo to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo) triplets[index*3+2]; - triplets[index*3+2] = to; - return info; + void setStorageInfo(int index, DatanodeStorageInfo storage) { + assert this.storages != null : "BlockInfo is not initialized"; + this.storages[index] = storage; } public int getCapacity() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - return triplets.length / 3; + assert this.storages != null : "BlockInfo is not initialized"; + return storages.length; } /** @@ -240,80 +204,6 @@ public abstract class BlockInfo extends Block return -1; } - /** - * Insert this block into the head of the list of blocks - * related to the specified DatanodeStorageInfo. - * If the head is null then form a new list. - * @return current block as the new head of the list. - */ - BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { - int dnIndex = this.findStorageInfo(storage); - assert dnIndex >= 0 : "Data node is not found: current"; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is already in the list and cannot be inserted."; - this.setPrevious(dnIndex, null); - this.setNext(dnIndex, head); - if (head != null) { - head.setPrevious(head.findStorageInfo(storage), this); - } - return this; - } - - /** - * Remove this block from the list of blocks - * related to the specified DatanodeStorageInfo. - * If this block is the head of the list then return the next block as - * the new head. - * @return the new head of the list or null if the list becomes - * empy after deletion. - */ - BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { - if (head == null) { - return null; - } - int dnIndex = this.findStorageInfo(storage); - if (dnIndex < 0) { // this block is not on the data-node list - return head; - } - - BlockInfo next = this.getNext(dnIndex); - BlockInfo prev = this.getPrevious(dnIndex); - this.setNext(dnIndex, null); - this.setPrevious(dnIndex, null); - if (prev != null) { - prev.setNext(prev.findStorageInfo(storage), next); - } - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); - } - if (this == head) { // removing the head - head = next; - } - return head; - } - - /** - * Remove this block from the list of blocks related to the specified - * DatanodeDescriptor. Insert it into the head of the list of blocks. - * - * @return the new head of the list. - */ - public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, - int curIndex, int headIndex) { - if (head == this) { - return this; - } - BlockInfo next = this.setNext(curIndex, head); - BlockInfo prev = this.setPrevious(curIndex, null); - - head.setPrevious(headIndex, this); - prev.setNext(prev.findStorageInfo(storage), next); - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); - } - return this; - } - @Override public int hashCode() { // Super implementation is sufficient http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 746e298..f729c4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -35,20 +35,20 @@ public class BlockInfoContiguous extends BlockInfo { } /** - * Ensure that there is enough space to include num more triplets. - * @return first free triplet index. + * Ensure that there is enough space to include num more storages. + * @return first free storage index. */ private int ensureCapacity(int num) { - assert this.triplets != null : "BlockInfo is not initialized"; + assert this.storages != null : "BlockInfo is not initialized"; int last = numNodes(); - if (triplets.length >= (last+num)*3) { + if (storages.length >= (last+num)) { return last; } /* Not enough space left. Create a new array. Should normally * happen only when replication is manually increased by the user. */ - Object[] old = triplets; - triplets = new Object[(last+num)*3]; - System.arraycopy(old, 0, triplets, 0, last * 3); + DatanodeStorageInfo[] old = storages; + storages = new DatanodeStorageInfo[(last+num)]; + System.arraycopy(old, 0, storages, 0, last); return last; } @@ -57,8 +57,6 @@ public class BlockInfoContiguous extends BlockInfo { // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); - setNext(lastNode, null); - setPrevious(lastNode, null); return true; } @@ -68,25 +66,18 @@ public class BlockInfoContiguous extends BlockInfo { if (dnIndex < 0) { // the node is not found return false; } - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; // find the last not null node int lastNode = numNodes()-1; - // replace current node triplet by the lastNode one + // replace current node entry by the lastNode one setStorageInfo(dnIndex, getStorageInfo(lastNode)); - setNext(dnIndex, getNext(lastNode)); - setPrevious(dnIndex, getPrevious(lastNode)); - // set the last triplet to null + // set the last entry to null setStorageInfo(lastNode, null); - setNext(lastNode, null); - setPrevious(lastNode, null); return true; } @Override public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + assert this.storages != null : "BlockInfo is not initialized"; for (int idx = getCapacity()-1; idx >= 0; idx--) { if (getDatanode(idx) != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 20d5858..c6e26ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -26,21 +26,20 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. * - * We still use triplets to store DatanodeStorageInfo for each block in the - * block group, as well as the previous/next block in the corresponding - * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * We still use a storage array to store DatanodeStorageInfo for each block in + * the block group. For a (m+k) block group, the first (m+k) storage units * are sorted and strictly mapped to the corresponding block. * * Normally each block belonging to group is stored in only one DataNode. - * However, it is possible that some block is over-replicated. Thus the triplet + * However, it is possible that some block is over-replicated. Thus the storage * array's size can be larger than (m+k). Thus currently we use an extra byte - * array to record the block index for each triplet. + * array to record the block index for each entry. */ @InterfaceAudience.Private public class BlockInfoStriped extends BlockInfo { private final ErasureCodingPolicy ecPolicy; /** - * Always the same size with triplets. Record the block index for each triplet + * Always the same size with storage. Record the block index for each entry * TODO: actually this is only necessary for over-replicated block. Thus can * be further optimized to save memory usage. */ @@ -104,7 +103,7 @@ public class BlockInfoStriped extends BlockInfo { return i; } } - // need to expand the triplet size + // need to expand the storage size ensureCapacity(i + 1, true); return i; } @@ -130,8 +129,6 @@ public class BlockInfoStriped extends BlockInfo { private void addStorage(DatanodeStorageInfo storage, int index, int blockIndex) { setStorageInfo(index, storage); - setNext(index, null); - setPrevious(index, null); indices[index] = (byte) blockIndex; } @@ -173,26 +170,22 @@ public class BlockInfoStriped extends BlockInfo { if (dnIndex < 0) { // the node is not found return false; } - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; - // set the triplet to null + // set the entry to null setStorageInfo(dnIndex, null); - setNext(dnIndex, null); - setPrevious(dnIndex, null); indices[dnIndex] = -1; return true; } private void ensureCapacity(int totalSize, boolean keepOld) { if (getCapacity() < totalSize) { - Object[] old = triplets; + DatanodeStorageInfo[] old = storages; byte[] oldIndices = indices; - triplets = new Object[totalSize * 3]; + storages = new DatanodeStorageInfo[totalSize]; indices = new byte[totalSize]; initIndices(); if (keepOld) { - System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(old, 0, storages, 0, old.length); System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); } } @@ -214,8 +207,7 @@ public class BlockInfoStriped extends BlockInfo { @Override public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + assert this.storages != null : "BlockInfo is not initialized"; int num = 0; for (int idx = getCapacity()-1; idx >= 0; idx--) { if (getStorageInfo(idx) != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 587e6b6..25cec8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -106,6 +108,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -195,7 +198,12 @@ public class BlockManager implements BlockStatsMXBean { /**replicationRecheckInterval is how often namenode checks for new replication work*/ private final long replicationRecheckInterval; - + + /** How often to check and the limit for the storageinfo efficiency. */ + private final long storageInfoDefragmentInterval; + private final long storageInfoDefragmentTimeout; + private final double storageInfoDefragmentRatio; + /** * Mapping: Block -> { BlockCollection, datanodes, self ref } * Updated only in response to client-sent information. @@ -204,6 +212,10 @@ public class BlockManager implements BlockStatsMXBean { /** Replication thread. */ final Daemon replicationThread = new Daemon(new ReplicationMonitor()); + + /** StorageInfoDefragmenter thread. */ + private final Daemon storageInfoDefragmenterThread = + new Daemon(new StorageInfoDefragmenter()); /** Block report thread for handling async reports. */ private final BlockReportProcessingThread blockReportThread = @@ -376,7 +388,20 @@ public class BlockManager implements BlockStatsMXBean { this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; - + + this.storageInfoDefragmentInterval = + conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT); + this.storageInfoDefragmentTimeout = + conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT); + this.storageInfoDefragmentRatio = + conf.getDouble( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT); + this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); @@ -508,6 +533,8 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.start(); + storageInfoDefragmenterThread.setName("StorageInfoMonitor"); + storageInfoDefragmenterThread.start(); this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); @@ -517,8 +544,10 @@ public class BlockManager implements BlockStatsMXBean { bmSafeMode.close(); try { replicationThread.interrupt(); + storageInfoDefragmenterThread.interrupt(); blockReportThread.interrupt(); replicationThread.join(3000); + storageInfoDefragmenterThread.join(3000); blockReportThread.join(3000); } catch (InterruptedException ie) { } @@ -1165,9 +1194,15 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given datanode. */ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { - final Iterator it = node.getBlockIterator(); - while(it.hasNext()) { - removeStoredBlock(it.next(), node); + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + final Iterator it = storage.getBlockIterator(); + while (it.hasNext()) { + BlockInfo block = it.next(); + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + it.remove(); + removeStoredBlock(block, node); + } } // Remove all pending DN messages referencing this DN. pendingDNMessages.removeAllMessagesForDatanode(node); @@ -1183,6 +1218,9 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { BlockInfo block = it.next(); + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + it.remove(); removeStoredBlock(block, node); final Block b = getBlockOnStorage(block, storageInfo); if (b != null) { @@ -2033,8 +2071,8 @@ public class BlockManager implements BlockStatsMXBean { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport, BlockReportContext context, - boolean lastStorageInRpc) throws IOException { + final BlockListAsLongs newReport, + BlockReportContext context, boolean lastStorageInRpc) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -2079,7 +2117,8 @@ public class BlockManager implements BlockStatsMXBean { nodeID.getDatanodeUuid()); processFirstBlockReport(storageInfo, newReport); } else { - invalidatedBlocks = processReport(storageInfo, newReport); + invalidatedBlocks = processReport(storageInfo, newReport, + context != null ? context.isSorted() : false); } storageInfo.receivedBlockReport(); @@ -2149,6 +2188,9 @@ public class BlockManager implements BlockStatsMXBean { // TODO: remove this assumption in case we want to put a block on // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + iter.remove(); removeStoredBlock(block, zombie.getDatanodeDescriptor()); Block b = getBlockOnStorage(block, zombie); if (b != null) { @@ -2238,7 +2280,7 @@ public class BlockManager implements BlockStatsMXBean { private Collection processReport( final DatanodeStorageInfo storageInfo, - final BlockListAsLongs report) throws IOException { + final BlockListAsLongs report, final boolean sorted) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -2248,9 +2290,29 @@ public class BlockManager implements BlockStatsMXBean { Collection toInvalidate = new LinkedList<>(); Collection toCorrupt = new LinkedList<>(); Collection toUC = new LinkedList<>(); - reportDiff(storageInfo, report, - toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + Iterable sortedReport; + if (!sorted) { + blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is " + + "unsorted. This will cause overhead on the NameNode " + + "which needs to sort the Full BR. Please update the " + + "DataNode to the same version of Hadoop HDFS as the " + + "NameNode ({}).", + storageInfo.getDatanodeDescriptor().getDatanodeUuid(), + VersionInfo.getVersion()); + Set set = new FoldedTreeSet<>(); + for (BlockReportReplica iblk : report) { + set.add(new BlockReportReplica(iblk)); + } + sortedReport = set; + } else { + sortedReport = report; + } + + reportDiffSorted(storageInfo, sortedReport, + toAdd, toRemove, toInvalidate, toCorrupt, toUC); + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { @@ -2399,126 +2461,111 @@ public class BlockManager implements BlockStatsMXBean { } } - private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, + private void reportDiffSorted(DatanodeStorageInfo storageInfo, + Iterable newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - // place a delimiter in the list which separates blocks - // that have been reported from those that have not - Block delimiterBlock = new Block(); - BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, - (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); - assert result == AddBlockResult.ADDED - : "Delimiting block cannot be present in the node"; - int headIndex = 0; //currently the delimiter is in the head of the list - int curIndex; - - if (newReport == null) { - newReport = BlockListAsLongs.EMPTY; - } - // scan the report and process newly reported blocks - for (BlockReportReplica iblk : newReport) { - ReplicaState iState = iblk.getState(); - BlockInfo storedBlock = processReportedBlock(storageInfo, - iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); - - // move block to the head of the list - if (storedBlock != null && - (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { - headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); + // The blocks must be sorted and the storagenodes blocks must be sorted + Iterator storageBlocksIterator = storageInfo.getBlockIterator(); + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + BlockInfo storageBlock = null; + + for (BlockReportReplica replica : newReport) { + + long replicaID = replica.getBlockId(); + if (BlockIdManager.isStripedBlockID(replicaID) + && (!hasNonEcBlockUsingStripedID || + !blocksMap.containsBlock(replica))) { + replicaID = BlockIdManager.convertToStripedID(replicaID); + } + + ReplicaState reportedState = replica.getState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Reported block " + replica + + " on " + dn + " size " + replica.getNumBytes() + + " replicaState = " + reportedState); + } + + if (shouldPostponeBlocksFromFuture + && isGenStampInFuture(replica)) { + queueReportedBlock(storageInfo, replica, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + continue; + } + + if (storageBlock == null && storageBlocksIterator.hasNext()) { + storageBlock = storageBlocksIterator.next(); } + + do { + int cmp; + if (storageBlock == null || + (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) { + // Check if block is available in NN but not yet on this storage + BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID)); + if (nnBlock != null) { + reportDiffSortedInner(storageInfo, replica, reportedState, + nnBlock, toAdd, toCorrupt, toUC); + } else { + // Replica not found anywhere so it should be invalidated + toInvalidate.add(new Block(replica)); + } + break; + } else if (cmp == 0) { + // Replica matched current storageblock + reportDiffSortedInner(storageInfo, replica, reportedState, + storageBlock, toAdd, toCorrupt, toUC); + storageBlock = null; + } else { + // replica has higher ID than storedBlock + // Remove all stored blocks with IDs lower than replica + do { + toRemove.add(storageBlock); + storageBlock = storageBlocksIterator.hasNext() + ? storageBlocksIterator.next() : null; + } while (storageBlock != null && + Long.compare(replicaID, storageBlock.getBlockId()) > 0); + } + } while (storageBlock != null); } - // collect blocks that have not been reported - // all of them are next to the delimiter - Iterator it = - storageInfo.new BlockIterator(delimiter.getNext(0)); - while (it.hasNext()) { - toRemove.add(it.next()); + // Iterate any remaing blocks that have not been reported and remove them + while (storageBlocksIterator.hasNext()) { + toRemove.add(storageBlocksIterator.next()); } - storageInfo.removeBlock(delimiter); } - /** - * Process a block replica reported by the data-node. - * No side effects except adding to the passed-in Collections. - * - *
    - *
  1. If the block is not known to the system (not in blocksMap) then the - * data-node should be notified to invalidate this block.
  2. - *
  3. If the reported replica is valid that is has the same generation stamp - * and length as recorded on the name-node, then the replica location should - * be added to the name-node.
  4. - *
  5. If the reported replica is not valid, then it is marked as corrupt, - * which triggers replication of the existing valid replicas. - * Corrupt replicas are removed from the system when the block - * is fully replicated.
  6. - *
  7. If the reported replica is for a block currently marked "under - * construction" in the NN, then it should be added to the - * BlockUnderConstructionFeature's list of replicas.
  8. - *
- * - * @param storageInfo DatanodeStorageInfo that sent the report. - * @param block reported block replica - * @param reportedState reported replica state - * @param toAdd add to DatanodeDescriptor - * @param toInvalidate missing blocks (not in the blocks map) - * should be removed from the data-node - * @param toCorrupt replicas with unexpected length or generation stamp; - * add to corrupt replicas - * @param toUC replicas of blocks currently under construction - * @return the up-to-date stored block, if it should be kept. - * Otherwise, null. - */ - private BlockInfo processReportedBlock( + private void reportDiffSortedInner( final DatanodeStorageInfo storageInfo, - final Block block, final ReplicaState reportedState, + final BlockReportReplica replica, final ReplicaState reportedState, + final BlockInfo storedBlock, final Collection toAdd, - final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { - - DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); - if(LOG.isDebugEnabled()) { - LOG.debug("Reported block " + block - + " on " + dn + " size " + block.getNumBytes() - + " replicaState = " + reportedState); - } - - if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) { - queueReportedBlock(storageInfo, block, reportedState, - QUEUE_REASON_FUTURE_GENSTAMP); - return null; - } - - // find block by blockId - BlockInfo storedBlock = getStoredBlock(block); - if(storedBlock == null) { - // If blocksMap does not contain reported block id, - // the replica should be removed from the data-node. - toInvalidate.add(new Block(block)); - return null; - } + assert replica != null; + assert storedBlock != null; + + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); BlockUCState ucState = storedBlock.getBlockUCState(); - + // Block is on the NN - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("In memory blockUCState = " + ucState); } // Ignore replicas already scheduled to be removed from the DN - if(invalidateBlocks.contains(dn, block)) { - return storedBlock; + if (invalidateBlocks.contains(dn, replica)) { + return; } - BlockToMarkCorrupt c = checkReplicaCorrupt( - block, reportedState, storedBlock, ucState, dn); + BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState, + storedBlock, ucState, dn); if (c != null) { if (shouldPostponeBlocksFromFuture) { // If the block is an out-of-date generation stamp or state, @@ -2532,23 +2579,16 @@ public class BlockManager implements BlockStatsMXBean { } else { toCorrupt.add(c); } - return storedBlock; - } - - if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - toUC.add(new StatefulBlockInfo(storedBlock, - new Block(block), reportedState)); - return storedBlock; - } - - // Add replica if appropriate. If the replica was previously corrupt - // but now okay, it might need to be updated. - if (reportedState == ReplicaState.FINALIZED - && (storedBlock.findStorageInfo(storageInfo) == -1 || - corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(new BlockInfoToAdd(storedBlock, block)); + } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { + toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica), + reportedState)); + } else if (reportedState == ReplicaState.FINALIZED && + (storedBlock.findStorageInfo(storageInfo) == -1 || + corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { + // Add replica if appropriate. If the replica was previously corrupt + // but now okay, it might need to be updated. + toAdd.add(new BlockInfoToAdd(storedBlock, replica)); } - return storedBlock; } /** @@ -2774,7 +2814,7 @@ public class BlockManager implements BlockStatsMXBean { } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock, reported); + AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -3497,40 +3537,75 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { - // blockReceived reports a finalized block - Collection toAdd = new LinkedList<>(); - Collection toInvalidate = new LinkedList(); - Collection toCorrupt = new LinkedList(); - Collection toUC = new LinkedList(); + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, - toCorrupt, toUC); - // the block is only in one of the to-do lists - // if it is in none then data-node already has it - assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 - : "The block should be only in one of the lists."; + if(LOG.isDebugEnabled()) { + LOG.debug("Reported block " + block + + " on " + node + " size " + block.getNumBytes() + + " replicaState = " + reportedState); + } - for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, storageInfo); + if (shouldPostponeBlocksFromFuture && + isGenStampInFuture(block)) { + queueReportedBlock(storageInfo, block, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + return; } - long numBlocksLogged = 0; - for (BlockInfoToAdd b : toAdd) { - addStoredBlock(b.stored, b.reported, storageInfo, delHintNode, - numBlocksLogged < maxNumBlocksToLog); - numBlocksLogged++; + + // find block by blockId + BlockInfo storedBlock = getStoredBlock(block); + if(storedBlock == null) { + // If blocksMap does not contain reported block id, + // the replica should be removed from the data-node. + blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " + + "belong to any file", block, node, block.getNumBytes()); + addToInvalidates(new Block(block), node); + return; } - if (numBlocksLogged > maxNumBlocksToLog) { - blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.", - maxNumBlocksToLog, numBlocksLogged); + + BlockUCState ucState = storedBlock.getBlockUCState(); + // Block is on the NN + if(LOG.isDebugEnabled()) { + LOG.debug("In memory blockUCState = " + ucState); } - for (Block b : toInvalidate) { - blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " + - "belong to any file", b, node, b.getNumBytes()); - addToInvalidates(b, node); + + // Ignore replicas already scheduled to be removed from the DN + if(invalidateBlocks.contains(node, block)) { + return; } - for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, storageInfo, node); + + BlockToMarkCorrupt c = checkReplicaCorrupt( + block, reportedState, storedBlock, ucState, node); + if (c != null) { + if (shouldPostponeBlocksFromFuture) { + // If the block is an out-of-date generation stamp or state, + // but we're the standby, we shouldn't treat it as corrupt, + // but instead just queue it for later processing. + // TODO: Pretty confident this should be s/storedBlock/block below, + // since we should be postponing the info of the reported block, not + // the stored block. See HDFS-6289 for more context. + queueReportedBlock(storageInfo, storedBlock, reportedState, + QUEUE_REASON_CORRUPT_STATE); + } else { + markBlockAsCorrupt(c, storageInfo, node); + } + return; + } + + if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { + addStoredBlockUnderConstruction( + new StatefulBlockInfo(storedBlock, new Block(block), reportedState), + storageInfo); + return; + } + + // Add replica if appropriate. If the replica was previously corrupt + // but now okay, it might need to be updated. + if (reportedState == ReplicaState.FINALIZED + && (storedBlock.findStorageInfo(storageInfo) == -1 || + corruptReplicas.isReplicaCorrupt(storedBlock, node))) { + addStoredBlock(storedBlock, block, storageInfo, delHintNode, true); } } @@ -4060,6 +4135,87 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Runnable that monitors the fragmentation of the StorageInfo TreeSet and + * compacts it when it falls under a certain threshold. + */ + private class StorageInfoDefragmenter implements Runnable { + + @Override + public void run() { + while (namesystem.isRunning()) { + try { + // Check storage efficiency only when active NN is out of safe mode. + if (isPopulatingReplQueues()) { + scanAndCompactStorages(); + } + Thread.sleep(storageInfoDefragmentInterval); + } catch (Throwable t) { + if (!namesystem.isRunning()) { + LOG.info("Stopping thread."); + if (!(t instanceof InterruptedException)) { + LOG.info("Received an exception while shutting down.", t); + } + break; + } else if (!checkNSRunning && t instanceof InterruptedException) { + LOG.info("Stopping for testing."); + break; + } + LOG.error("Thread received Runtime exception.", t); + terminate(1, t); + } + } + } + + private void scanAndCompactStorages() throws InterruptedException { + ArrayList datanodesAndStorages = new ArrayList<>(); + for (DatanodeDescriptor node + : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) { + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + try { + namesystem.readLock(); + double ratio = storage.treeSetFillRatio(); + if (ratio < storageInfoDefragmentRatio) { + datanodesAndStorages.add(node.getDatanodeUuid()); + datanodesAndStorages.add(storage.getStorageID()); + } + LOG.info("StorageInfo TreeSet fill ratio {} : {}{}", + storage.getStorageID(), ratio, + (ratio < storageInfoDefragmentRatio) + ? " (queued for defragmentation)" : ""); + } finally { + namesystem.readUnlock(); + } + } + } + if (!datanodesAndStorages.isEmpty()) { + for (int i = 0; i < datanodesAndStorages.size(); i += 2) { + namesystem.writeLock(); + try { + DatanodeStorageInfo storage = datanodeManager. + getDatanode(datanodesAndStorages.get(i)). + getStorageInfo(datanodesAndStorages.get(i + 1)); + if (storage != null) { + boolean aborted = + !storage.treeSetCompact(storageInfoDefragmentTimeout); + if (aborted) { + // Compaction timed out, reset iterator to continue with + // the same storage next iteration. + i -= 2; + } + LOG.info("StorageInfo TreeSet defragmented {} : {}{}", + storage.getStorageID(), storage.treeSetFillRatio(), + aborted ? " (aborted)" : ""); + } + } finally { + namesystem.writeUnlock(); + } + // Wait between each iteration + Thread.sleep(1000); + } + } + } + } /** * Compute block replication and block invalidation work that can be scheduled http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 47a21fe..71d0598 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Collections; import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; @@ -30,37 +31,6 @@ import org.apache.hadoop.util.LightWeightGSet; * the datanodes that store the block. */ class BlocksMap { - private static class StorageIterator implements Iterator { - private final BlockInfo blockInfo; - private int nextIdx = 0; - - StorageIterator(BlockInfo blkInfo) { - this.blockInfo = blkInfo; - } - - @Override - public boolean hasNext() { - if (blockInfo == null) { - return false; - } - while (nextIdx < blockInfo.getCapacity() && - blockInfo.getDatanode(nextIdx) == null) { - // note that for striped blocks there may be null in the triplets - nextIdx++; - } - return nextIdx < blockInfo.getCapacity(); - } - - @Override - public DatanodeStorageInfo next() { - return blockInfo.getStorageInfo(nextIdx++); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); - } - } /** Constant {@link LightWeightGSet} capacity. */ private final int capacity; @@ -132,6 +102,16 @@ class BlocksMap { } } + /** + * Check if BlocksMap contains the block. + * + * @param b Block to check + * @return true if block is in the map, otherwise false + */ + boolean containsBlock(Block b) { + return blocks.contains(b); + } + /** Returns the block object if it exists in the map. */ BlockInfo getStoredBlock(Block b) { return blocks.get(b); @@ -142,7 +122,9 @@ class BlocksMap { * returns {@link Iterable} of the storages the block belongs to. */ Iterable getStorages(Block b) { - return getStorages(blocks.get(b)); + BlockInfo block = blocks.get(b); + return block != null ? getStorages(block) + : Collections.emptyList(); } /** @@ -150,12 +132,16 @@ class BlocksMap { * returns {@link Iterable} of the storages the block belongs to. */ Iterable getStorages(final BlockInfo storedBlock) { - return new Iterable() { - @Override - public Iterator iterator() { - return new StorageIterator(storedBlock); - } - }; + if (storedBlock == null) { + return Collections.emptyList(); + } else { + return new Iterable() { + @Override + public Iterator iterator() { + return storedBlock.getStorageInfos(); + } + }; + } } /** counts number of containing nodes. Better than using iterator. */ @@ -174,7 +160,7 @@ class BlocksMap { if (info == null) return false; - // remove block from the data-node list and the node from the block info + // remove block from the data-node set and the node from the block info boolean removed = removeBlock(node, info); if (info.hasNoStorage() // no datanodes left @@ -185,7 +171,7 @@ class BlocksMap { } /** - * Remove block from the list of blocks belonging to the data-node. Remove + * Remove block from the set of blocks belonging to the data-node. Remove * data-node from the block. */ static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 1f1b24b..c4729ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; import com.google.common.annotations.VisibleForTesting; @@ -85,31 +86,6 @@ public class DatanodeStorageInfo { storageType = storage.getStorageType(); } - /** - * Iterates over the list of blocks belonging to the data-node. - */ - class BlockIterator implements Iterator { - private BlockInfo current; - - BlockIterator(BlockInfo head) { - this.current = head; - } - - public boolean hasNext() { - return current != null; - } - - public BlockInfo next() { - BlockInfo res = current; - current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this)); - return res; - } - - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); - } - } - private final DatanodeDescriptor dn; private final String storageID; private StorageType storageType; @@ -120,8 +96,7 @@ public class DatanodeStorageInfo { private volatile long remaining; private long blockPoolUsed; - private volatile BlockInfo blockList = null; - private int numBlocks = 0; + private final FoldedTreeSet blocks = new FoldedTreeSet<>(); // The ID of the last full block report which updated this storage. private long lastBlockReportId = 0; @@ -207,7 +182,7 @@ public class DatanodeStorageInfo { } boolean areBlocksOnFailedStorage() { - return getState() == State.FAILED && numBlocks != 0; + return getState() == State.FAILED && !blocks.isEmpty(); } @VisibleForTesting @@ -234,6 +209,36 @@ public class DatanodeStorageInfo { long getBlockPoolUsed() { return blockPoolUsed; } + /** + * For use during startup. Expects block to be added in sorted order + * to enable fast insert in to the DatanodeStorageInfo + * + * @param b Block to add to DatanodeStorageInfo + * @param reportedBlock The reported replica + * @return Enum describing if block was added, replaced or already existed + */ + public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) { + // First check whether the block belongs to a different storage + // on the same DN. + AddBlockResult result = AddBlockResult.ADDED; + DatanodeStorageInfo otherStorage = + b.findStorageInfo(getDatanodeDescriptor()); + + if (otherStorage != null) { + if (otherStorage != this) { + // The block belongs to a different storage. Remove it first. + otherStorage.removeBlock(b); + result = AddBlockResult.REPLACED; + } else { + // The block is already associated with this storage. + return AddBlockResult.ALREADY_EXIST; + } + } + + b.addStorage(this, reportedBlock); + blocks.addSortedLast(b); + return result; + } public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { // First check whether the block belongs to a different storage @@ -253,9 +258,8 @@ public class DatanodeStorageInfo { } } - // add to the head of the data-node list b.addStorage(this, reportedBlock); - insertToList(b); + blocks.add(b); return result; } @@ -263,45 +267,17 @@ public class DatanodeStorageInfo { return addBlock(b, b); } - public void insertToList(BlockInfo b) { - blockList = b.listInsert(blockList, this); - numBlocks++; - } - - public boolean removeBlock(BlockInfo b) { - blockList = b.listRemove(blockList, this); - if (b.removeStorage(this)) { - numBlocks--; - return true; - } else { - return false; - } + boolean removeBlock(BlockInfo b) { + blocks.remove(b); + return b.removeStorage(this); } int numBlocks() { - return numBlocks; + return blocks.size(); } Iterator getBlockIterator() { - return new BlockIterator(blockList); - } - - /** - * Move block to the head of the list of blocks belonging to the data-node. - * @return the index of the head of the blockList - */ - int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) { - blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex); - return curIndex; - } - - /** - * Used for testing only - * @return the head of the blockList - */ - @VisibleForTesting - BlockInfo getBlockListHeadForTesting(){ - return blockList; + return blocks.iterator(); } void updateState(StorageReport r) { @@ -349,6 +325,27 @@ public class DatanodeStorageInfo { false, capacity, dfsUsed, remaining, blockPoolUsed); } + /** + * The fill ratio of the underlying TreeSet holding blocks. + * + * @return the fill ratio of the tree + */ + public double treeSetFillRatio() { + return blocks.fillRatio(); + } + + /** + * Compact the underlying TreeSet holding blocks. + * + * @param timeout Maximum time to spend compacting the tree set in + * milliseconds. + * + * @return true if compaction completed, false if aborted + */ + public boolean treeSetCompact(long timeout) { + return blocks.compact(timeout); + } + static Iterable toStorageTypes( final Iterable infos) { return new Iterable() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 1b72961..bc4f2d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -461,7 +461,7 @@ class BPServiceActor implements Runnable { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports, - new BlockReportContext(1, 0, reportId, fullBrLeaseId)); + new BlockReportContext(1, 0, reportId, fullBrLeaseId, true)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { @@ -474,7 +474,7 @@ class BPServiceActor implements Runnable { DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport, new BlockReportContext(reports.length, r, reportId, - fullBrLeaseId)); + fullBrLeaseId, true)); numReportsSent++; numRPCs++; if (cmd != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 6f0b8a7..34c9f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; -import org.apache.hadoop.util.LightWeightResizableGSet; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; /** * Maintains the replica map. @@ -33,9 +34,20 @@ class ReplicaMap { // Object using which this class is synchronized private final Object mutex; - // Map of block pool Id to another map of block Id to ReplicaInfo. - private final Map> map = - new HashMap>(); + // Map of block pool Id to a set of ReplicaInfo. + private final Map> map = new HashMap<>(); + + // Special comparator used to compare Long to Block ID in the TreeSet. + private static final Comparator LONG_AND_BLOCK_COMPARATOR + = new Comparator() { + + @Override + public int compare(Object o1, Object o2) { + long lookup = (long) o1; + long stored = ((Block) o2).getBlockId(); + return lookup > stored ? 1 : lookup < stored ? -1 : 0; + } + }; ReplicaMap(Object mutex) { if (mutex == null) { @@ -92,11 +104,14 @@ class ReplicaMap { ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - return m != null ? m.get(new Block(blockId)) : null; + FoldedTreeSet set = map.get(bpid); + if (set == null) { + return null; + } + return set.get(blockId, LONG_AND_BLOCK_COMPARATOR); } } - + /** * Add a replica's meta information into the map * @@ -109,13 +124,13 @@ class ReplicaMap { checkBlockPool(bpid); checkBlock(replicaInfo); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m == null) { + FoldedTreeSet set = map.get(bpid); + if (set == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + set = new FoldedTreeSet<>(); + map.put(bpid, set); } - return m.put(replicaInfo); + return set.addOrReplace(replicaInfo); } } @@ -138,12 +153,13 @@ class ReplicaMap { checkBlockPool(bpid); checkBlock(block); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m != null) { - ReplicaInfo replicaInfo = m.get(block); + FoldedTreeSet set = map.get(bpid); + if (set != null) { + ReplicaInfo replicaInfo = + set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { - return m.remove(block); + return set.removeAndGet(replicaInfo); } } } @@ -160,9 +176,9 @@ class ReplicaMap { ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m != null) { - return m.remove(new Block(blockId)); + FoldedTreeSet set = map.get(bpid); + if (set != null) { + return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR); } } return null; @@ -174,10 +190,9 @@ class ReplicaMap { * @return the number of replicas in the map */ int size(String bpid) { - LightWeightResizableGSet m = null; synchronized(mutex) { - m = map.get(bpid); - return m != null ? m.size() : 0; + FoldedTreeSet set = map.get(bpid); + return set != null ? set.size() : 0; } } @@ -192,19 +207,17 @@ class ReplicaMap { * @return a collection of the replicas belonging to the block pool */ Collection replicas(String bpid) { - LightWeightResizableGSet m = null; - m = map.get(bpid); - return m != null ? m.values() : null; + return map.get(bpid); } void initBlockPool(String bpid) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m == null) { + FoldedTreeSet set = map.get(bpid); + if (set == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + set = new FoldedTreeSet<>(); + map.put(bpid, set); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java index 5bcd719..94749e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java @@ -52,12 +52,16 @@ public class BlockReportContext { */ private final long leaseId; + private final boolean sorted; + public BlockReportContext(int totalRpcs, int curRpc, - long reportId, long leaseId) { + long reportId, long leaseId, + boolean sorted) { this.totalRpcs = totalRpcs; this.curRpc = curRpc; this.reportId = reportId; this.leaseId = leaseId; + this.sorted = sorted; } public int getTotalRpcs() { @@ -75,4 +79,8 @@ public class BlockReportContext { public long getLeaseId() { return leaseId; } + + public boolean isSorted() { + return sorted; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index add4e73..b962855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -131,7 +131,6 @@ public interface DatanodeProtocol { * Each finalized block is represented as 3 longs. Each under- * construction replica is represented as 4 longs. * This is done instead of Block[] to reduce memory used by block reports. - * @param reports report of blocks per storage * @param context Context information for this block report. * * @return - the next command for DN to process.