Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3A9F18753 for ; Fri, 26 Jun 2015 18:05:37 +0000 (UTC) Received: (qmail 38826 invoked by uid 500); 26 Jun 2015 18:05:37 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 38761 invoked by uid 500); 26 Jun 2015 18:05:37 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38752 invoked by uid 99); 26 Jun 2015 18:05:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jun 2015 18:05:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8303BDFF6B; Fri, 26 Jun 2015 18:05:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jing9@apache.org To: common-commits@hadoop.apache.org Message-Id: <9e0a6610983f4b87a81216d121737b55@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang. Date: Fri, 26 Jun 2015 18:05:37 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 8552af91f -> 83d76151e HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang. (cherry picked from commit de480d6c8945bd8b5b00e8657b7a72ce8dd9b6b5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83d76151 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83d76151 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83d76151 Branch: refs/heads/branch-2 Commit: 83d76151e279ebd3f11f7b342350816e7dca6d76 Parents: 8552af9 Author: Jing Zhao Authored: Fri Jun 26 10:49:01 2015 -0700 Committer: Jing Zhao Committed: Fri Jun 26 11:04:05 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/blockmanagement/BlockInfo.java | 12 +- .../blockmanagement/BlockInfoContiguous.java | 2 +- .../BlockInfoUnderConstructionContiguous.java | 2 +- .../server/blockmanagement/BlockManager.java | 595 ++++++++++--------- .../blockmanagement/DatanodeStorageInfo.java | 15 +- .../hdfs/server/namenode/FSNamesystem.java | 18 +- .../hdfs/server/namenode/NamenodeFsck.java | 2 +- .../server/blockmanagement/TestBlockInfo.java | 2 +- .../blockmanagement/TestBlockManager.java | 4 +- .../server/blockmanagement/TestNodeCount.java | 2 +- .../TestOverReplicatedBlocks.java | 4 +- .../blockmanagement/TestReplicationPolicy.java | 2 +- 13 files changed, 361 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 641d8bd..91088e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -343,6 +343,9 @@ Release 2.8.0 - UNRELEASED HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison via Colin P. McCabe) + HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery + blocks. (Zhe Zhang via jing9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 4459974..b048115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -175,19 +175,23 @@ public abstract class BlockInfo extends Block public abstract int numNodes(); /** - * Add a {@link DatanodeStorageInfo} location for a block. + * Add a {@link DatanodeStorageInfo} location for a block + * @param storage The storage to add + * @param reportedBlock The block reported from the datanode. This is only + * used by erasure coded blocks, this block's id contains + * information indicating the index of the block in the + * corresponding block group. */ - abstract boolean addStorage(DatanodeStorageInfo storage); + abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock); /** * Remove {@link DatanodeStorageInfo} location for a block */ abstract boolean removeStorage(DatanodeStorageInfo storage); - /** * Replace the current BlockInfo with the new one in corresponding - * DatanodeStorageInfo's linked list + * DatanodeStorageInfo's linked list. */ abstract void replaceBlock(BlockInfo newBlock); http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index b9abcd0..de64ad8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -45,7 +45,7 @@ public class BlockInfoContiguous extends BlockInfo { } @Override - boolean addStorage(DatanodeStorageInfo storage) { + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { return ContiguousBlockStorageOp.addStorage(this, storage); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java index c66675a..d3cb337 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java @@ -69,7 +69,7 @@ public class BlockInfoUnderConstructionContiguous extends } @Override - boolean addStorage(DatanodeStorageInfo storage) { + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { return ContiguousBlockStorageOp.addStorage(this, storage); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8b685d3..419569e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -196,8 +196,8 @@ public class BlockManager { * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map> excessReplicateMap = - new TreeMap>(); + public final Map> excessReplicateMap = + new TreeMap<>(); /** * Store set of Blocks that need to be replicated 1 or more times. @@ -493,8 +493,8 @@ public class BlockManager { /** Dump meta data to out. */ public void metaSave(PrintWriter out) { assert namesystem.hasWriteLock(); - final List live = new ArrayList(); - final List dead = new ArrayList(); + final List live = new ArrayList<>(); + final List dead = new ArrayList<>(); datanodeManager.fetchDatanodes(live, dead, false); out.println("Live Datanodes: " + live.size()); out.println("Dead Datanodes: " + dead.size()); @@ -533,8 +533,8 @@ public class BlockManager { List containingNodes = new ArrayList(); List containingLiveReplicasNodes = - new ArrayList(); - + new ArrayList<>(); + NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, @@ -563,7 +563,7 @@ public class BlockManager { Collection corruptNodes = corruptReplicas.getNodes(block); - for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + for (DatanodeStorageInfo storage : getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); String state = ""; if (corruptNodes != null && corruptNodes.contains(node)) { @@ -586,11 +586,23 @@ public class BlockManager { return maxReplicationStreams; } + public int getDefaultStorageNum(BlockInfo block) { + return defaultReplication; + } + + public short getMinStorageNum(BlockInfo block) { + return minReplication; + } + /** - * @return true if the block has minimum replicas + * @return true if the block has minimum stored copies */ - public boolean checkMinReplication(BlockInfo block) { - return (countNodes(block).liveReplicas() >= minReplication); + public boolean hasMinStorage(BlockInfo block) { + return hasMinStorage(block, countNodes(block).liveReplicas()); + } + + public boolean hasMinStorage(BlockInfo block, int liveNum) { + return liveNum >= getMinStorageNum(block); } /** @@ -605,8 +617,9 @@ public class BlockManager { private static boolean commitBlock( final BlockInfoUnderConstruction block, final Block commitBlock) throws IOException { - if (block.getBlockUCState() == BlockUCState.COMMITTED) + if (block.getBlockUCState() == BlockUCState.COMMITTED) { return false; + } assert block.getNumBytes() <= commitBlock.getNumBytes() : "commitBlock length is less than the stored one " + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); @@ -626,18 +639,22 @@ public class BlockManager { */ public boolean commitOrCompleteLastBlock(BlockCollection bc, Block commitBlock) throws IOException { - if(commitBlock == null) + if (commitBlock == null) { return false; // not committing, this is a block allocation retry + } BlockInfo lastBlock = bc.getLastBlock(); - if(lastBlock == null) + if (lastBlock == null) { return false; // no blocks in file yet - if(lastBlock.isComplete()) + } + if (lastBlock.isComplete()) { return false; // already completed (e.g. by syncBlock) - + } + final boolean b = commitBlock( (BlockInfoUnderConstruction) lastBlock, commitBlock); - if(countNodes(lastBlock).liveReplicas() >= minReplication) + if(hasMinStorage(lastBlock)) { completeBlock(bc, bc.numBlocks()-1, false); + } return b; } @@ -650,20 +667,24 @@ public class BlockManager { */ private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { - if(blkIndex < 0) + if (blkIndex < 0) { return null; + } BlockInfo curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) + if(curBlock.isComplete()) { return curBlock; + } BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction) curBlock; int numNodes = ucBlock.numNodes(); - if (!force && numNodes < minReplication) + if (!force && !hasMinStorage(curBlock, numNodes)) { throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); - if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) + } + if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) { throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); + } BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@ -748,7 +769,7 @@ public class BlockManager { // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - targets.length >= minReplication ? -1 : 0, + hasMinStorage(oldBlock, targets.length) ? -1 : 0, // always decrement total blocks -1); @@ -762,8 +783,8 @@ public class BlockManager { */ private List getValidLocations(Block block) { final List locations - = new ArrayList(blocksMap.numNodes(block)); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + = new ArrayList<>(blocksMap.numNodes(block)); + for(DatanodeStorageInfo storage : getStorages(block)) { // filter invalidate replicas if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { locations.add(storage); @@ -776,7 +797,7 @@ public class BlockManager { final BlockInfo[] blocks, final long offset, final long length, final int nrBlocksToReturn, final AccessMode mode) throws IOException { - int curBlk = 0; + int curBlk; long curPos = 0, blkSize = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -789,10 +810,10 @@ public class BlockManager { } if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file - return Collections.emptyList(); + return Collections.emptyList(); long endOff = offset + length; - List results = new ArrayList(blocks.length); + List results = new ArrayList<>(blocks.length); do { results.add(createLocatedBlock(blocks[curBlk], curPos, mode)); curPos += blocks[curBlk].getNumBytes(); @@ -805,7 +826,7 @@ public class BlockManager { private LocatedBlock createLocatedBlock(final BlockInfo[] blocks, final long endPos, final AccessMode mode) throws IOException { - int curBlk = 0; + int curBlk; long curPos = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -829,8 +850,8 @@ public class BlockManager { } /** @return a LocatedBlock for the given block */ - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos - ) throws IOException { + private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) + throws IOException { if (blk instanceof BlockInfoUnderConstruction) { if (blk.isComplete()) { throw new IOException( @@ -840,7 +861,8 @@ public class BlockManager { final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction) blk; final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); + final ExtendedBlock eb = + new ExtendedBlock(namesystem.getBlockPoolId(), blk); return newLocatedBlock(eb, storages, pos, false); } @@ -859,11 +881,12 @@ public class BlockManager { final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; int j = 0; if (numMachines > 0) { - for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { + for(DatanodeStorageInfo storage : getStorages(blk)) { final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); - if (isCorrupt || (!replicaCorrupt)) + if (isCorrupt || (!replicaCorrupt)) { machines[j++] = storage; + } } } assert j == machines.length : @@ -1037,7 +1060,7 @@ public class BlockManager { for(int i=0; i results = new ArrayList(); + List results = new ArrayList<>(); long totalSize = 0; BlockInfo curBlock; while(totalSize it = node.getBlockIterator(); + final Iterator it = node.getBlockIterator(); while(it.hasNext()) { removeStoredBlock(it.next(), node); } @@ -1075,10 +1098,10 @@ public class BlockManager { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { assert namesystem.hasWriteLock(); - final Iterator it = storageInfo.getBlockIterator(); + final Iterator it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { - Block block = it.next(); + BlockInfo block = it.next(); removeStoredBlock(block, node); invalidateBlocks.remove(node, block); } @@ -1100,18 +1123,20 @@ public class BlockManager { * Adds block to list of blocks which will be invalidated on all its * datanodes. */ - private void addToInvalidates(Block b) { + private void addToInvalidates(BlockInfo storedBlock) { if (!namesystem.isPopulatingReplQueues()) { return; } StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, + State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - invalidateBlocks.add(b, node, false); + invalidateBlocks.add(storedBlock, node, false); datanodes.append(node).append(" "); } if (datanodes.length() != 0) { - blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString()); + blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock, + datanodes.toString()); } } @@ -1138,7 +1163,8 @@ public class BlockManager { public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); - final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); + final Block reportedBlock = blk.getLocalBlock(); + final BlockInfo storedBlock = getStoredBlock(reportedBlock); if (storedBlock == null) { // Check if the replica is in the blockMap, if not // ignore the request for now. This could happen when BlockScanner @@ -1154,8 +1180,8 @@ public class BlockManager { + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } - - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + + markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), storageID == null ? null : node.getStorageInfo(storageID), node); @@ -1171,18 +1197,18 @@ public class BlockManager { DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.corrupted.isDeleted()) { + if (b.stored.isDeleted()) { blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; } short expectedReplicas = - b.corrupted.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored); + storageInfo.addBlock(b.stored, b.corrupted); } // Add this replica to corruptReplicas Map @@ -1192,8 +1218,8 @@ public class BlockManager { NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = - numberOfReplicas.liveReplicas() >= minReplication; + boolean minReplicationSatisfied = hasMinStorage(b.stored, + numberOfReplicas.liveReplicas()); boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@ -1336,7 +1362,7 @@ public class BlockManager { int additionalReplRequired; int scheduledWork = 0; - List work = new LinkedList(); + List work = new LinkedList<>(); namesystem.writeLock(); try { @@ -1351,11 +1377,11 @@ public class BlockManager { continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // get a source data-node - containingNodes = new ArrayList(); - List liveReplicaNodes = new ArrayList(); + containingNodes = new ArrayList<>(); + List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( block, containingNodes, liveReplicaNodes, numReplicas, @@ -1375,7 +1401,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications blockLog.info("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1399,7 +1425,7 @@ public class BlockManager { namesystem.writeUnlock(); } - final Set excludedNodes = new HashSet(); + final Set excludedNodes = new HashSet<>(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. @@ -1435,7 +1461,7 @@ public class BlockManager { rw.targets = null; continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // do not schedule more if enough replicas is already pending NumberReplicas numReplicas = countNodes(block); @@ -1444,7 +1470,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; blockLog.info("BLOCK* Removing {} from neededReplications as" + @@ -1454,7 +1480,7 @@ public class BlockManager { } if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { + (!blockHasEnoughRacks(block, requiredReplication)) ) { if (rw.srcNode.getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case @@ -1569,7 +1595,7 @@ public class BlockManager { List getDatanodeDescriptors(List nodes) { List datanodeDescriptors = null; if (nodes != null) { - datanodeDescriptors = new ArrayList(nodes.size()); + datanodeDescriptors = new ArrayList<>(nodes.size()); for (int i = 0; i < nodes.size(); i++) { DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i)); if (node != null) { @@ -1625,9 +1651,9 @@ public class BlockManager { int excess = 0; Collection nodesCorrupt = corruptReplicas.getNodes(block); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + for(DatanodeStorageInfo storage : getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - LightWeightLinkedSet excessBlocks = + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) @@ -1695,7 +1721,7 @@ public class BlockManager { * Use the blockinfo from the blocksmap to be certain we're working * with the most up-to-date block information (e.g. genstamp). */ - BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]); + BlockInfo bi = getStoredBlock(timedOutItems[i]); if (bi == null) { continue; } @@ -1745,7 +1771,7 @@ public class BlockManager { final BlockInfoUnderConstruction storedBlock; final Block reportedBlock; final ReplicaState reportedState; - + StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, Block reportedBlock, ReplicaState reportedState) { this.storedBlock = storedBlock; @@ -1753,14 +1779,34 @@ public class BlockManager { this.reportedState = reportedState; } } - + + private static class BlockInfoToAdd { + private final BlockInfo stored; + private final Block reported; + + BlockInfoToAdd(BlockInfo stored, Block reported) { + this.stored = stored; + this.reported = reported; + } + + public BlockInfo getStored() { + return stored; + } + + public Block getReported() { + return reported; + } + } + /** * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a * list of blocks that should be considered corrupt due to a block report. */ private static class BlockToMarkCorrupt { - /** The corrupted block in a datanode. */ - final BlockInfo corrupted; + /** The corrupted block in a datanode. This is the one reported by the + * datanode. + */ + final Block corrupted; /** The corresponding block stored in the BlockManager. */ final BlockInfo stored; /** The reason to mark corrupt. */ @@ -1768,7 +1814,7 @@ public class BlockManager { /** The reason code to be stored */ final Reason reasonCode; - BlockToMarkCorrupt(BlockInfo corrupted, + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason, Reason reasonCode) { Preconditions.checkNotNull(corrupted, "corrupted is null"); @@ -1780,15 +1826,9 @@ public class BlockManager { this.reasonCode = reasonCode; } - BlockToMarkCorrupt(BlockInfo stored, String reason, - Reason reasonCode) { - this(stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, - Reason reasonCode) { - this(new BlockInfoContiguous(stored), stored, - reason, reasonCode); + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, + String reason, Reason reasonCode) { + this(corrupted, stored, reason, reasonCode); //the corrupted block in datanode has a different generation stamp corrupted.setGenerationStamp(gs); } @@ -1975,7 +2015,7 @@ public class BlockManager { break; } - BlockInfo bi = blocksMap.getStoredBlock(b); + BlockInfo bi = getStoredBlock(b); if (bi == null) { if (LOG.isDebugEnabled()) { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + @@ -2007,7 +2047,7 @@ public class BlockManager { endPostponedMisReplicatedBlocksCount) + " blocks are removed."); } } - + private Collection processReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { @@ -2015,25 +2055,26 @@ public class BlockManager { // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // - Collection toAdd = new LinkedList(); - Collection toRemove = new TreeSet(); - Collection toInvalidate = new LinkedList(); - Collection toCorrupt = new LinkedList(); - Collection toUC = new LinkedList(); + Collection toAdd = new LinkedList<>(); + Collection toRemove = new TreeSet<>(); + Collection toInvalidate = new LinkedList<>(); + Collection toCorrupt = new LinkedList<>(); + Collection toUC = new LinkedList<>(); reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue - for (StatefulBlockInfo b : toUC) { + for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } - for (Block b : toRemove) { + for (BlockInfo b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; - for (BlockInfo b : toAdd) { - addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.getStored(), b.getReported(), storageInfo, null, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2054,17 +2095,17 @@ public class BlockManager { * Mark block replicas as corrupt except those on the storages in * newStorages list. */ - public void markBlockReplicasAsCorrupt(BlockInfo block, - long oldGenerationStamp, long oldNumBytes, + public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block, + long oldGenerationStamp, long oldNumBytes, DatanodeStorageInfo[] newStorages) throws IOException { assert namesystem.hasWriteLock(); BlockToMarkCorrupt b = null; if (block.getGenerationStamp() != oldGenerationStamp) { - b = new BlockToMarkCorrupt(block, oldGenerationStamp, + b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp, "genstamp does not match " + oldGenerationStamp + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (block.getNumBytes() != oldNumBytes) { - b = new BlockToMarkCorrupt(block, + b = new BlockToMarkCorrupt(oldBlock, block, "length does not match " + oldNumBytes + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH); } else { @@ -2123,7 +2164,7 @@ public class BlockManager { continue; } - BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); + BlockInfo storedBlock = getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -2161,24 +2202,26 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, storageInfo); + addStoredBlockImmediate(storedBlock, iblk, storageInfo); } } } - private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, - Collection toAdd, // add to DatanodeDescriptor - Collection toRemove, // remove from DatanodeDescriptor + private void reportDiff(DatanodeStorageInfo storageInfo, + BlockListAsLongs newReport, + Collection toAdd, // add to DatanodeDescriptor + Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - // place a delimiter in the list which separates blocks + // place a delimiter in the list which separates blocks // that have been reported from those that have not - BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter); - assert result == AddBlockResult.ADDED + Block delimiterBlock = new Block(); + BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, + (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); + assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list int curIndex; @@ -2195,7 +2238,8 @@ public class BlockManager { // move block to the head of the list if (storedBlock != null && (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { - headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); + headIndex = + storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); } } @@ -2203,8 +2247,9 @@ public class BlockManager { // all of them are next to the delimiter Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); - while(it.hasNext()) + while (it.hasNext()) { toRemove.add(it.next()); + } storageInfo.removeBlock(delimiter); } @@ -2241,12 +2286,12 @@ public class BlockManager { */ private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, - final Block block, final ReplicaState reportedState, - final Collection toAdd, - final Collection toInvalidate, + final Block block, final ReplicaState reportedState, + final Collection toAdd, + final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { - + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); if(LOG.isDebugEnabled()) { @@ -2254,16 +2299,16 @@ public class BlockManager { + " on " + dn + " size " + block.getNumBytes() + " replicaState = " + reportedState); } - + if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } - + // find block by blockId - BlockInfo storedBlock = blocksMap.getStoredBlock(block); + BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, // the replica should be removed from the data-node. @@ -2271,7 +2316,7 @@ public class BlockManager { return null; } BlockUCState ucState = storedBlock.getBlockUCState(); - + // Block is on the NN if(LOG.isDebugEnabled()) { LOG.debug("In memory blockUCState = " + ucState); @@ -2316,8 +2361,8 @@ public class BlockManager { // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED && (storedBlock.findStorageInfo(storageInfo) == -1 || - corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(storedBlock); + corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { + toAdd.add(new BlockInfoToAdd(storedBlock, block)); } return storedBlock; } @@ -2363,7 +2408,7 @@ public class BlockManager { if (rbi.getReportedState() == null) { // This is a DELETE_BLOCK request DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); - removeStoredBlock(rbi.getBlock(), + removeStoredBlock(getStoredBlock(rbi.getBlock()), storageInfo.getDatanodeDescriptor()); } else { processAndHandleReportedBlock(rbi.getStorageInfo(), @@ -2411,15 +2456,15 @@ public class BlockManager { case COMMITTED: if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS - + " does not match genstamp in block map " - + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, "block is " + ucState + " and reported length " + - reported.getNumBytes() + " does not match " + - "length in block map " + storedBlock.getNumBytes(), + reported.getNumBytes() + " does not match " + + "length in block map " + storedBlock.getNumBytes(), Reason.SIZE_MISMATCH); } else { return null; // not corrupt @@ -2427,11 +2472,12 @@ public class BlockManager { case UNDER_CONSTRUCTION: if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " - + ucState + " and reported state " + reportedState - + ", But reported genstamp " + reportedGS + return new BlockToMarkCorrupt(new Block(reported), storedBlock, + reportedGS, "block is " + ucState + " and reported state " + + reportedState + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " - + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + + storedBlock.getGenerationStamp(), + Reason.GENSTAMP_MISMATCH); } return null; default: @@ -2441,12 +2487,15 @@ public class BlockManager { case RWR: if (!storedBlock.isComplete()) { return null; // not corrupt - } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { + } else if (storedBlock.getGenerationStamp() != + reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, - "reported " + reportedState + " replica with genstamp " + reportedGS - + " does not match COMPLETE block's genstamp in block map " - + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + return new BlockToMarkCorrupt( + new Block(reported), storedBlock, reportedGS, + "reported " + reportedState + + " replica with genstamp " + reportedGS + + " does not match COMPLETE block's genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else { // COMPLETE block, same genstamp if (reportedState == ReplicaState.RBW) { // If it's a RBW report for a COMPLETE block, it may just be that @@ -2458,7 +2507,7 @@ public class BlockManager { "complete with the same genstamp"); return null; } else { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, "reported replica has invalid state " + reportedState, Reason.INVALID_STATE); } @@ -2471,7 +2520,8 @@ public class BlockManager { " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); + return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg, + Reason.INVALID_STATE); } } @@ -2504,7 +2554,7 @@ public class BlockManager { if (ucBlock.reportedState == ReplicaState.FINALIZED && (block.findStorageInfo(storageInfo) < 0)) { - addStoredBlock(block, storageInfo, null, true); + addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true); } } @@ -2519,23 +2569,23 @@ public class BlockManager { * * @throws IOException */ - private void addStoredBlockImmediate(BlockInfo storedBlock, + private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) - throws IOException { + throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); - if (!namesystem.isInStartupSafeMode() + if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, storageInfo, null, false); + addStoredBlock(storedBlock, reported, storageInfo, null, false); return; } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) { + && hasMinStorage(storedBlock, numCurrentReplica)) { completeBlock(storedBlock.getBlockCollection(), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2549,19 +2599,20 @@ public class BlockManager { /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. - * @return the block that is stored in blockMap. + * @return the block that is stored in blocksMap. */ private Block addStoredBlock(final BlockInfo block, - DatanodeStorageInfo storageInfo, - DatanodeDescriptor delNodeHint, - boolean logEveryBlock) - throws IOException { + final Block reportedBlock, + DatanodeStorageInfo storageInfo, + DatanodeDescriptor delNodeHint, + boolean logEveryBlock) + throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread - storedBlock = blocksMap.getStoredBlock(block); + storedBlock = getStoredBlock(block); } else { storedBlock = block; } @@ -2575,10 +2626,9 @@ public class BlockManager { return block; } BlockCollection bc = storedBlock.getBlockCollection(); - assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@ -2606,10 +2656,10 @@ public class BlockManager { NumberReplicas num = countNodes(storedBlock); int numLiveReplicas = num.liveReplicas(); int numCurrentReplica = numLiveReplicas - + pendingReplications.getNumReplicas(storedBlock); + + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - numLiveReplicas >= minReplication) { + hasMinStorage(storedBlock, numLiveReplicas)) { storedBlock = completeBlock(bc, storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2619,7 +2669,7 @@ public class BlockManager { // handles the safe block count maintenance. namesystem.incrementSafeBlockCount(numCurrentReplica); } - + // if file is under construction, then done for now if (bc.isUnderConstruction()) { return storedBlock; @@ -2631,7 +2681,7 @@ public class BlockManager { } // handle underReplication/overReplication - short fileReplication = bc.getPreferredBlockReplication(); + short fileReplication = getExpectedReplicaNum(bc, storedBlock); if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedAndDecommissioning(), fileReplication); @@ -2647,11 +2697,12 @@ public class BlockManager { int numCorruptNodes = num.corruptReplicas(); if (numCorruptNodes != corruptReplicasCount) { LOG.warn("Inconsistent number of corrupt replicas for " + - storedBlock + "blockMap has " + numCorruptNodes + + storedBlock + ". blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) - invalidateCorruptReplicas(storedBlock); + if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { + invalidateCorruptReplicas(storedBlock, reportedBlock); + } return storedBlock; } @@ -2683,7 +2734,7 @@ public class BlockManager { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfo blk) { + private void invalidateCorruptReplicas(BlockInfo blk, Block reported) { Collection nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) @@ -2693,8 +2744,8 @@ public class BlockManager { DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, - Reason.ANY), node)) { + if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, + Reason.ANY), node)) { removedFromBlocksMap = false; } } catch (IOException e) { @@ -2862,7 +2913,7 @@ public class BlockManager { } // calculate current replication short expectedReplication = - block.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(block.getBlockCollection(), block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be @@ -2921,14 +2972,14 @@ public class BlockManager { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final Block block, + private void processOverReplicatedBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); if (addedNode == delNodeHint) { delNodeHint = null; } - Collection nonExcess = new ArrayList(); + Collection nonExcess = new ArrayList<>(); Collection corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -2942,8 +2993,8 @@ public class BlockManager { postponeBlock(block); return; } - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(cur - .getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + cur.getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas @@ -2953,7 +3004,7 @@ public class BlockManager { } } } - chooseExcessReplicates(nonExcess, block, replication, + chooseExcessReplicates(nonExcess, block, replication, addedNode, delNodeHint, blockplacement); } @@ -2972,29 +3023,29 @@ public class BlockManager { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates(final Collection nonExcess, - Block b, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { + private void chooseExcessReplicates( + final Collection nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint, + BlockPlacementPolicy replicator) { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and - BlockCollection bc = getBlockCollection(b); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); + BlockCollection bc = getBlockCollection(storedBlock); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); final List excessTypes = storagePolicy.chooseExcess( replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); + final Map> rackMap = new HashMap<>(); + final List moreThanOne = new ArrayList<>(); + final List exactlyOne = new ArrayList<>(); - final Map> rackMap - = new HashMap>(); - final List moreThanOne = new ArrayList(); - final List exactlyOne = new ArrayList(); - // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica // exactlyOne contains the remaining nodes replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); - + // pick one node to delete that favors the delete hint // otherwise pick one with least space from priSet if it is not empty // otherwise one node with least space from remains @@ -3009,7 +3060,7 @@ public class BlockManager { moreThanOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, b, replication, + cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication, moreThanOne, exactlyOne, excessTypes); } firstOne = false; @@ -3018,24 +3069,29 @@ public class BlockManager { replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); - nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), b); - - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the namenode. - // - addToInvalidates(b, cur.getDatanodeDescriptor()); - blockLog.info("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, b); + processChosenExcessReplica(nonExcess, cur, storedBlock); } } + private void processChosenExcessReplica( + final Collection nonExcess, + final DatanodeStorageInfo chosen, BlockInfo storedBlock) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + addToInvalidates(storedBlock, chosen.getDatanodeDescriptor()); + blockLog.info("BLOCK* chooseExcessReplicates: " + +"({}, {}) is added to invalidated blocks set", chosen, storedBlock); + } + /** Check if we can use delHint */ static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, DatanodeStorageInfo added, List moreThan1Racks, @@ -3057,17 +3113,18 @@ public class BlockManager { } } - private void addToExcessReplicate(DatanodeInfo dn, Block block) { + private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + dn.getDatanodeUuid()); if (excessBlocks == null) { - excessBlocks = new LightWeightLinkedSet(); + excessBlocks = new LightWeightLinkedSet<>(); excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } - if (excessBlocks.add(block)) { + if (excessBlocks.add(storedBlock)) { excessBlocksCount.incrementAndGet(); blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, block); + + " excessReplicateMap", dn, storedBlock); } } @@ -3079,26 +3136,26 @@ public class BlockManager { QUEUE_REASON_FUTURE_GENSTAMP); return; } - removeStoredBlock(block, node); + removeStoredBlock(getStoredBlock(block), node); } /** * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - public void removeStoredBlock(Block block, DatanodeDescriptor node) { - blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); + public void removeStoredBlock(BlockInfo storedBlock, + DatanodeDescriptor node) { + blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); assert (namesystem.hasWriteLock()); { - BlockInfo storedBlock = getStoredBlock(block); if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + - " removed from node {}", block, node); + " removed from node {}", storedBlock, node); return; } CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() - .get(new CachedBlock(block.getBlockId(), (short) 0, false)); + .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); if (cblock != null) { boolean removed = false; removed |= node.getPendingCached().remove(cblock); @@ -3106,7 +3163,7 @@ public class BlockManager { removed |= node.getPendingUncached().remove(cblock); if (removed) { blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " - + "related lists on node {}", block, node); + + "related lists on node {}", storedBlock, node); } } @@ -3116,7 +3173,7 @@ public class BlockManager { // necessary. In that case, put block on a possibly-will- // be-replicated list. // - BlockCollection bc = blocksMap.getBlockCollection(block); + BlockCollection bc = storedBlock.getBlockCollection(); if (bc != null) { namesystem.decrementSafeBlockCount(storedBlock); updateNeededReplications(storedBlock, -1, 0); @@ -3126,13 +3183,13 @@ public class BlockManager { // We've removed a block from a node, so it's definitely no longer // in "excess" there. // - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node - .getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + node.getDatanodeUuid()); if (excessBlocks != null) { - if (excessBlocks.remove(block)) { + if (excessBlocks.remove(storedBlock)) { excessBlocksCount.decrementAndGet(); blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + - "excessBlocks", block); + "excessBlocks", storedBlock); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getDatanodeUuid()); } @@ -3140,7 +3197,7 @@ public class BlockManager { } // Remove the replica from corruptReplicas - corruptReplicas.removeFromCorruptReplicasMap(block, node); + corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node); } } @@ -3148,7 +3205,7 @@ public class BlockManager { * Get all valid locations of the block & add the block to results * return the length of the added block; 0 if the block is not added */ - private long addBlock(Block block, List results) { + private long addBlock(BlockInfo block, List results) { final List locations = getValidLocations(block); if(locations.size() == 0) { return 0; @@ -3200,31 +3257,32 @@ public class BlockManager { processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - + private void processAndHandleReportedBlock( DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block - Collection toAdd = new LinkedList(); - Collection toInvalidate = new LinkedList(); - Collection toCorrupt = new LinkedList(); - Collection toUC = new LinkedList(); + Collection toAdd = new LinkedList<>(); + Collection toInvalidate = new LinkedList<>(); + Collection toCorrupt = new LinkedList<>(); + Collection toUC = new LinkedList<>(); final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, - toAdd, toInvalidate, toCorrupt, toUC); + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, + toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 - : "The block should be only in one of the lists."; + : "The block should be only in one of the lists."; - for (StatefulBlockInfo b : toUC) { + for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; - for (BlockInfo b : toAdd) { - addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.getStored(), b.getReported(), storageInfo, delHintNode, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -3289,7 +3347,7 @@ public class BlockManager { ReplicaState.RBW, null); break; default: - String msg = + String msg = "Unknown block status code reported by " + nodeID + ": " + rdbi; blockLog.warn(msg); @@ -3325,8 +3383,8 @@ public class BlockManager { } else if (node.isDecommissioned()) { decommissioned++; } else { - LightWeightLinkedSet blocksExcess = excessReplicateMap.get(node - .getDatanodeUuid()); + LightWeightLinkedSet blocksExcess = + excessReplicateMap.get(node.getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { @@ -3379,13 +3437,13 @@ public class BlockManager { int numOverReplicated = 0; while(it.hasNext()) { final BlockInfo block = it.next(); - BlockCollection bc = blocksMap.getBlockCollection(block); - short expectedReplication = bc.getPreferredBlockReplication(); + int expectedReplication = this.getReplication(block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); if (numCurrentReplica > expectedReplication) { // over-replicated block - processOverReplicatedBlock(block, expectedReplication, null, null); + processOverReplicatedBlock(block, (short) expectedReplication, null, + null); numOverReplicated++; } } @@ -3411,7 +3469,7 @@ public class BlockManager { if (pendingReplicationBlocksCount == 0 && underReplicatedBlocksCount == 0) { LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", + " blocks or blocks pending replication. Safe to decommission.", node); return true; } @@ -3429,6 +3487,12 @@ public class BlockManager { return blocksMap.size(); } + + /** @return an iterator of the datanodes. */ + public Iterable getStorages(final Block block) { + return blocksMap.getStorages(block); + } + public DatanodeStorageInfo[] getStorages(BlockInfo block) { final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; int i = 0; @@ -3517,10 +3581,12 @@ public class BlockManager { final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)b; final int numNodes = b.numNodes(); - LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " - + uc.getBlockUCState() + ", replication# = " + numNodes - + (numNodes < minReplication ? " < ": " >= ") - + " minimum = " + minReplication + ") in file " + src); + final int min = getMinStorageNum(b); + final BlockUCState state = b.getBlockUCState(); + LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state + + ", replication# = " + numNodes + + (numNodes < min ? " < " : " >= ") + + " minimum = " + min + ") in file " + src); return false; } } @@ -3531,15 +3597,15 @@ public class BlockManager { * @return 0 if the block is not found; * otherwise, return the replication factor of the block. */ - private int getReplication(Block block) { + private int getReplication(BlockInfo block) { final BlockCollection bc = blocksMap.getBlockCollection(block); - return bc == null? 0: bc.getPreferredBlockReplication(); + return bc == null? 0: getExpectedReplicaNum(bc, block); } /** - * Get blocks to invalidate for nodeId - * in {@link #invalidateBlocks}. + * Get blocks to invalidate for nodeId. + * in {@link #invalidateBlocks}.boolean blockHasEnoughRacks * * @return number of blocks scheduled for removal during this iteration. */ @@ -3577,22 +3643,20 @@ public class BlockManager { return toInvalidate.size(); } - boolean blockHasEnoughRacks(Block b) { + boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) { if (!this.shouldCheckForEnoughRacks) { return true; } - boolean enoughRacks = false;; - Collection corruptNodes = - corruptReplicas.getNodes(b); - int numExpectedReplicas = getReplication(b); + boolean enoughRacks = false; + Collection corruptNodes = + corruptReplicas.getNodes(storedBlock); String rackName = null; - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : getStorages(storedBlock)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { - if (numExpectedReplicas == 1 || - (numExpectedReplicas > 1 && - !datanodeManager.hasClusterEverBeenMultiRack())) { + if (expectedStorageNum == 1 || (expectedStorageNum > 1 && + !datanodeManager.hasClusterEverBeenMultiRack())) { enoughRacks = true; break; } @@ -3613,8 +3677,13 @@ public class BlockManager { * A block needs replication if the number of replicas is less than expected * or if it does not have enough racks. */ - boolean isNeededReplication(Block b, int expected, int current) { - return current < expected || !blockHasEnoughRacks(b); + boolean isNeededReplication(BlockInfo storedBlock, int expected, + int current) { + return current < expected || !blockHasEnoughRacks(storedBlock, expected); + } + + public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) { + return bc.getPreferredBlockReplication(); } public long getMissingBlocksCount() { @@ -3636,11 +3705,6 @@ public class BlockManager { return blocksMap.getBlockCollection(b); } - /** @return an iterator of the datanodes. */ - public Iterable getStorages(final Block block) { - return blocksMap.getStorages(block); - } - public int numCorruptReplicas(Block block) { return corruptReplicas.numCorruptReplicas(block); } @@ -3656,9 +3720,10 @@ public class BlockManager { * If a block is removed from blocksMap, remove it from excessReplicateMap. */ private void removeFromExcessReplicateMap(Block block) { - for (DatanodeStorageInfo info : blocksMap.getStorages(block)) { + for (DatanodeStorageInfo info : getStorages(block)) { String uuid = info.getDatanodeDescriptor().getDatanodeUuid(); - LightWeightLinkedSet excessReplicas = excessReplicateMap.get(uuid); + LightWeightLinkedSet excessReplicas = + excessReplicateMap.get(uuid); if (excessReplicas != null) { if (excessReplicas.remove(block)) { excessBlocksCount.decrementAndGet(); @@ -3673,26 +3738,6 @@ public class BlockManager { public int getCapacity() { return blocksMap.getCapacity(); } - - /** - * Return a range of corrupt replica block ids. Up to numExpectedBlocks - * blocks starting at the next block after startingBlockId are returned - * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId - * is null, up to numExpectedBlocks blocks are returned from the beginning. - * If startingBlockId cannot be found, null is returned. - * - * @param numExpectedBlocks Number of block ids to return. - * 0 <= numExpectedBlocks <= 100 - * @param startingBlockId Block id from which to start. If null, start at - * beginning. - * @return Up to numExpectedBlocks blocks from startingBlockId if it exists - * - */ - public long[] getCorruptReplicaBlockIds(int numExpectedBlocks, - Long startingBlockId) { - return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks, - startingBlockId); - } /** * Return an iterator over the set of blocks for which there are no replicas. @@ -3867,7 +3912,7 @@ public class BlockManager { /** * A simple result enum for the result of - * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. + * {@link BlockManager#processMisReplicatedBlock}. */ enum MisReplicationResult { /** The block should be invalidated since it belongs to a deleted file. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 57e81b4..65b83e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -226,7 +227,7 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - public AddBlockResult addBlock(BlockInfo b) { + public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { // First check whether the block belongs to a different storage // on the same DN. AddBlockResult result = AddBlockResult.ADDED; @@ -245,10 +246,18 @@ public class DatanodeStorageInfo { } // add to the head of the data-node list - b.addStorage(this); + b.addStorage(this, reportedBlock); + insertToList(b); + return result; + } + + AddBlockResult addBlock(BlockInfo b) { + return addBlock(b, b); + } + + public void insertToList(BlockInfo b) { blockList = b.listInsert(blockList, this); numBlocks++; - return result; } public boolean removeBlock(BlockInfo b) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 80d018e..c23d132 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -147,7 +147,6 @@ import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -3133,7 +3132,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (trackBlockCounts) { if (b.isComplete()) { numRemovedComplete++; - if (blockManager.checkMinReplication(b)) { + if (blockManager.hasMinStorage(b)) { numRemovedSafe++; } } @@ -3365,7 +3364,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; - assert blockManager.checkMinReplication(curBlock) : + assert blockManager.hasMinStorage(curBlock) : "A COMPLETE block is not minimally replicated in " + src; } @@ -3401,7 +3400,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // If penultimate block doesn't exist then its minReplication is met boolean penultimateBlockMinReplication = penultimateBlock == null ? true : - blockManager.checkMinReplication(penultimateBlock); + blockManager.hasMinStorage(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3410,7 +3409,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, case COMMITTED: // Close file if committed blocks are minimally replicated if(penultimateBlockMinReplication && - blockManager.checkMinReplication(lastBlock)) { + blockManager.hasMinStorage(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" @@ -3702,9 +3701,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { if(copyTruncate) { - storageInfo.addBlock(truncatedBlock); + storageInfo.addBlock(truncatedBlock, truncatedBlock); } else { - storageInfo.addBlock(storedBlock); + storageInfo.addBlock(storedBlock, storedBlock); } } } @@ -3720,8 +3719,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } else { iFile.setLastBlock(storedBlock, trimmedStorageInfos); if (closeFile) { - blockManager.markBlockReplicasAsCorrupt(storedBlock, - oldGenerationStamp, oldNumBytes, trimmedStorageInfos); + blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(), + storedBlock, oldGenerationStamp, oldNumBytes, + trimmedStorageInfos); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index ca0ef85..846abcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -636,7 +636,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { .getStorageType())); } if (showReplicaDetails) { - LightWeightLinkedSet blocksExcess = + LightWeightLinkedSet blocksExcess = bm.excessReplicateMap.get(dnDesc.getDatanodeUuid()); Collection corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index 5126aa7..bae4f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -63,7 +63,7 @@ public class TestBlockInfo { final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); - boolean added = blockInfo.addStorage(storage); + boolean added = blockInfo.addStorage(storage, blockInfo); Assert.assertTrue(added); Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 396dff3..9e31670 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -383,7 +383,7 @@ public class TestBlockManager { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; bm.addBlock(storage, blockInfo, null); - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } @@ -393,7 +393,7 @@ public class TestBlockManager { for (DatanodeDescriptor dn : nodes) { for (DatanodeStorageInfo storage : dn.getStorageInfos()) { - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } return blockInfo; http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index 1c3f075..c33667d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -100,7 +100,7 @@ public class TestNodeCount { DatanodeDescriptor nonExcessDN = null; for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); - Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); + Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { nonExcessDN = dn; break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 2d7bb44..83b3aa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.util.Time; import org.junit.Test; public class TestOverReplicatedBlocks { @@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks { // All replicas for deletion should be scheduled on lastDN. // And should not actually be deleted, because lastDN does not heartbeat. namesystem.readLock(); - Collection dnBlocks = + Collection dnBlocks = namesystem.getBlockManager().excessReplicateMap.get(lastDNid); assertEquals("Replicas on node " + lastDNid + " should have been deleted", SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/83d76151/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 2812957..44f0e65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1250,7 +1250,7 @@ public class TestReplicationPolicy { when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true); when(storage.addBlock(any(BlockInfo.class))).thenReturn (DatanodeStorageInfo.AddBlockResult.ADDED); - ucBlock.addStorage(storage); + ucBlock.addStorage(storage, ucBlock); when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) .thenReturn(ucBlock);