Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D8E62108CA for ; Mon, 6 Jan 2014 23:59:04 +0000 (UTC) Received: (qmail 55664 invoked by uid 500); 6 Jan 2014 23:59:04 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 55627 invoked by uid 500); 6 Jan 2014 23:59:04 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 55619 invoked by uid 99); 6 Jan 2014 23:59:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jan 2014 23:59:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jan 2014 23:59:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 572A9238890D; Mon, 6 Jan 2014 23:58:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1556076 [2/8] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org... Date: Mon, 06 Jan 2014 23:58:36 -0000 To: hdfs-commits@hadoop.apache.org From: arp@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140106235841.572A9238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1556076&r1=1556075&r2=1556076&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Mon Jan 6 23:58:33 2014 @@ -21,6 +21,7 @@ import java.util.LinkedList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -42,11 +43,11 @@ public class BlockInfo extends Block imp private LightWeightGSet.LinkedElement nextLinkedElement; /** - * This array contains triplets of references. For each i-th datanode the - * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor - * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and - * the next blocks, respectively, in the list of blocks belonging to this - * data-node. + * This array contains triplets of references. For each i-th storage, the + * block belongs to triplets[3*i] is the reference to the + * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are + * references to the previous and the next blocks, respectively, in the list + * of blocks belonging to this storage. * * Using previous and next in Object triplets is done instead of a * {@link LinkedList} list to efficiently use memory. With LinkedList the cost @@ -88,10 +89,15 @@ public class BlockInfo extends Block imp this.bc = bc; } - DatanodeDescriptor getDatanode(int index) { + public DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); + } + + DatanodeStorageInfo getStorageInfo(int index) { assert this.triplets != null : "BlockInfo is not initialized"; assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeDescriptor)triplets[index*3]; + return (DatanodeStorageInfo)triplets[index*3]; } private BlockInfo getPrevious(int index) { @@ -114,14 +120,10 @@ public class BlockInfo extends Block imp return info; } - private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous, - BlockInfo next) { + private void setStorageInfo(int index, DatanodeStorageInfo storage) { assert this.triplets != null : "BlockInfo is not initialized"; - int i = index * 3; - assert index >= 0 && i+2 < triplets.length : "Index is out of bound"; - triplets[i] = node; - triplets[i+1] = previous; - triplets[i+2] = next; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + triplets[index*3] = storage; } /** @@ -193,22 +195,34 @@ public class BlockInfo extends Block imp } /** - * Add data-node this block belongs to. + * Add a {@link DatanodeStorageInfo} location for a block */ - public boolean addNode(DatanodeDescriptor node) { - if(findDatanode(node) >= 0) // the node is already there - return false; + boolean addStorage(DatanodeStorageInfo storage) { + boolean added = true; + int idx = findDatanode(storage.getDatanodeDescriptor()); + if(idx >= 0) { + if (getStorageInfo(idx) == storage) { // the storage is already there + return false; + } else { + // The block is on the DN but belongs to a different storage. + // Update our state. + removeStorage(storage); + added = false; // Just updating storage. Return false. + } + } // find the last null node int lastNode = ensureCapacity(1); - setDatanode(lastNode, node, null, null); - return true; + setStorageInfo(lastNode, storage); + setNext(lastNode, null); + setPrevious(lastNode, null); + return added; } /** - * Remove data-node from the block. + * Remove {@link DatanodeStorageInfo} location for a block */ - public boolean removeNode(DatanodeDescriptor node) { - int dnIndex = findDatanode(node); + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfo(storage); if(dnIndex < 0) // the node is not found return false; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : @@ -216,10 +230,13 @@ public class BlockInfo extends Block imp // find the last not null node int lastNode = numNodes()-1; // replace current node triplet by the lastNode one - setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode), - getNext(lastNode)); + setStorageInfo(dnIndex, getStorageInfo(lastNode)); + setNext(dnIndex, getNext(lastNode)); + setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null - setDatanode(lastNode, null, null, null); + setStorageInfo(lastNode, null); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } @@ -239,37 +256,70 @@ public class BlockInfo extends Block imp } return -1; } + /** + * Find specified DatanodeStorageInfo. + * @param dn + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeInfo dn) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == null) + break; + if(cur.getDatanodeDescriptor() == dn) + return idx; + } + return -1; + } + + /** + * Find specified DatanodeStorageInfo. + * @param storageInfo + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeStorageInfo storageInfo) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == storageInfo) + return idx; + if(cur == null) + break; + } + return -1; + } /** * Insert this block into the head of the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If the head is null then form a new list. * @return current block as the new head of the list. */ - public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) { - int dnIndex = this.findDatanode(dn); + BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { + int dnIndex = this.findStorageInfo(storage); assert dnIndex >= 0 : "Data node is not found: current"; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : "Block is already in the list and cannot be inserted."; this.setPrevious(dnIndex, null); this.setNext(dnIndex, head); if(head != null) - head.setPrevious(head.findDatanode(dn), this); + head.setPrevious(head.findStorageInfo(storage), this); return this; } /** * Remove this block from the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If this block is the head of the list then return the next block as * the new head. * @return the new head of the list or null if the list becomes - * empty after deletion. + * empy after deletion. */ - public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) { + BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { if(head == null) return null; - int dnIndex = this.findDatanode(dn); + int dnIndex = this.findStorageInfo(storage); if(dnIndex < 0) // this block is not on the data-node list return head; @@ -278,9 +328,9 @@ public class BlockInfo extends Block imp this.setNext(dnIndex, null); this.setPrevious(dnIndex, null); if(prev != null) - prev.setNext(prev.findDatanode(dn), next); + prev.setNext(prev.findStorageInfo(storage), next); if(next != null) - next.setPrevious(next.findDatanode(dn), prev); + next.setPrevious(next.findStorageInfo(storage), prev); if(this == head) // removing the head head = next; return head; @@ -292,7 +342,7 @@ public class BlockInfo extends Block imp * * @return the new head of the list. */ - public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn, + public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, int curIndex, int headIndex) { if (head == this) { return this; @@ -301,9 +351,9 @@ public class BlockInfo extends Block imp BlockInfo prev = this.setPrevious(curIndex, null); head.setPrevious(headIndex, this); - prev.setNext(prev.findDatanode(dn), next); + prev.setNext(prev.findStorageInfo(storage), next); if (next != null) - next.setPrevious(next.findDatanode(dn), prev); + next.setPrevious(next.findStorageInfo(storage), prev); return this; } @@ -331,10 +381,10 @@ public class BlockInfo extends Block imp * @return BlockInfoUnderConstruction - an under construction block. */ public BlockInfoUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeDescriptor[] targets) { + BlockUCState s, DatanodeStorageInfo[] targets) { if(isComplete()) { - return new BlockInfoUnderConstruction( - this, getBlockCollection().getBlockReplication(), s, targets); + return new BlockInfoUnderConstruction(this, + getBlockCollection().getBlockReplication(), s, targets); } // the block is already under construction BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this; Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1556076&r1=1556075&r2=1556076&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Mon Jan 6 23:58:33 2014 @@ -63,12 +63,12 @@ public class BlockInfoUnderConstruction * corresponding replicas. */ static class ReplicaUnderConstruction extends Block { - private DatanodeDescriptor expectedLocation; + private final DatanodeStorageInfo expectedLocation; private ReplicaState state; private boolean chosenAsPrimary; ReplicaUnderConstruction(Block block, - DatanodeDescriptor target, + DatanodeStorageInfo target, ReplicaState state) { super(block); this.expectedLocation = target; @@ -82,7 +82,7 @@ public class BlockInfoUnderConstruction * It is not guaranteed, but expected, that the data-node actually has * the replica. */ - DatanodeDescriptor getExpectedLocation() { + private DatanodeStorageInfo getExpectedStorageLocation() { return expectedLocation; } @@ -118,7 +118,7 @@ public class BlockInfoUnderConstruction * Is data-node the replica belongs to alive. */ boolean isAlive() { - return expectedLocation.isAlive; + return expectedLocation.getDatanodeDescriptor().isAlive; } @Override // Block @@ -162,7 +162,7 @@ public class BlockInfoUnderConstruction */ public BlockInfoUnderConstruction(Block blk, int replication, BlockUCState state, - DatanodeDescriptor[] targets) { + DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; @@ -186,7 +186,7 @@ public class BlockInfoUnderConstruction } /** Set expected locations */ - public void setExpectedLocations(DatanodeDescriptor[] targets) { + public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ArrayList(numLocations); for(int i = 0; i < numLocations; i++) @@ -198,12 +198,12 @@ public class BlockInfoUnderConstruction * Create array of expected replica locations * (as has been assigned by chooseTargets()). */ - public DatanodeDescriptor[] getExpectedLocations() { + public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); - DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations]; + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; for(int i = 0; i < numLocations; i++) - locations[i] = replicas.get(i).getExpectedLocation(); - return locations; + storages[i] = replicas.get(i).getExpectedStorageLocation(); + return storages; } /** Get the number of expected locations */ @@ -244,9 +244,9 @@ public class BlockInfoUnderConstruction // The replica list is unchanged. for (ReplicaUnderConstruction r : replicas) { if (genStamp != r.getGenerationStamp()) { - r.getExpectedLocation().removeBlock(this); + r.getExpectedStorageLocation().removeBlock(this); NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " - + "from location: " + r.getExpectedLocation()); + + "from location: " + r.getExpectedStorageLocation()); } } } @@ -302,31 +302,44 @@ public class BlockInfoUnderConstruction if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { continue; } - if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) { - primary = replicas.get(i); + final ReplicaUnderConstruction ruc = replicas.get(i); + final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); + if (lastUpdate > mostRecentLastUpdate) { primaryNodeIndex = i; - mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate(); + primary = ruc; + mostRecentLastUpdate = lastUpdate; } } if (primary != null) { - primary.getExpectedLocation().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.info("BLOCK* " + this + " recovery started, primary=" + primary); } } - void addReplicaIfNotPresent(DatanodeDescriptor dn, + void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block, ReplicaState rState) { - for (ReplicaUnderConstruction r : replicas) { - if (r.getExpectedLocation() == dn) { + Iterator it = replicas.iterator(); + while (it.hasNext()) { + ReplicaUnderConstruction r = it.next(); + if(r.getExpectedStorageLocation() == storage) { // Record the gen stamp from the report r.setGenerationStamp(block.getGenerationStamp()); return; + } else if (r.getExpectedStorageLocation().getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + it.remove(); + break; } } - replicas.add(new ReplicaUnderConstruction(block, dn, rState)); + replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } @Override // BlockInfo Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1556076&r1=1556075&r2=1556076&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jan 6 23:58:33 2014 @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -44,6 +45,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; @@ -70,8 +72,10 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@ -470,8 +474,8 @@ public class BlockManager { private void dumpBlockMeta(Block block, PrintWriter out) { List containingNodes = new ArrayList(); - List containingLiveReplicasNodes = - new ArrayList(); + List containingLiveReplicasNodes = + new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used @@ -498,9 +502,8 @@ public class BlockManager { Collection corruptNodes = corruptReplicas.getNodes(block); - for (Iterator jt = blocksMap.nodeIterator(block); - jt.hasNext();) { - DatanodeDescriptor node = jt.next(); + for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); String state = ""; if (corruptNodes != null && corruptNodes.contains(node)) { state = "(corrupt)"; @@ -509,7 +512,7 @@ public class BlockManager { state = "(decommissioned)"; } - if (node.areBlockContentsStale()) { + if (storage.areBlockContentsStale()) { state += " (block deletions maybe out of date)"; } out.print(" " + node + state + " : "); @@ -660,10 +663,9 @@ public class BlockManager { assert oldBlock == getStoredBlock(oldBlock) : "last block of the file is not in blocksMap"; - DatanodeDescriptor[] targets = getNodes(oldBlock); + DatanodeStorageInfo[] targets = getStorages(oldBlock); - BlockInfoUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); + BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets); blocksMap.replaceBlock(ucBlock); // Remove block from replication queue. @@ -673,9 +675,8 @@ public class BlockManager { pendingReplications.remove(ucBlock); // remove this block from the list of pending blocks to be deleted. - for (DatanodeDescriptor dd : targets) { - String datanodeId = dd.getStorageID(); - invalidateBlocks.remove(datanodeId, oldBlock); + for (DatanodeStorageInfo storage : targets) { + invalidateBlocks.remove(storage.getStorageID(), oldBlock); } // Adjust safe-mode totals, since under-construction blocks don't @@ -694,18 +695,17 @@ public class BlockManager { /** * Get all valid locations of the block */ - private List getValidLocations(Block block) { - ArrayList machineSet = - new ArrayList(blocksMap.numNodes(block)); - for(Iterator it = - blocksMap.nodeIterator(block); it.hasNext();) { - String storageID = it.next().getStorageID(); + private List getValidLocations(Block block) { + final List locations + = new ArrayList(blocksMap.numNodes(block)); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final String storageID = storage.getStorageID(); // filter invalidate replicas if(!invalidateBlocks.contains(storageID, block)) { - machineSet.add(storageID); + locations.add(storage); } } - return machineSet; + return locations; } private List createLocatedBlockList(final BlockInfo[] blocks, @@ -773,9 +773,9 @@ public class BlockManager { + ", blk=" + blk); } final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk; - final DatanodeDescriptor[] locations = uc.getExpectedLocations(); + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedBlock(eb, locations, pos, false); + return new LocatedBlock(eb, storages, pos, false); } // get block locations @@ -790,15 +790,14 @@ public class BlockManager { final int numNodes = blocksMap.numNodes(blk); final boolean isCorrupt = numCorruptNodes == numNodes; final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; - final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines]; + final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; int j = 0; if (numMachines > 0) { - for(Iterator it = blocksMap.nodeIterator(blk); - it.hasNext();) { - final DatanodeDescriptor d = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { + final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); if (isCorrupt || (!isCorrupt && !replicaCorrupt)) - machines[j++] = d; + machines[j++] = storage; } } assert j == machines.length : @@ -990,13 +989,20 @@ public class BlockManager { } node.resetBlocks(); - invalidateBlocks.remove(node.getStorageID()); + invalidateBlocks.remove(node.getDatanodeUuid()); // If the DN hasn't block-reported since the most recent // failover, then we may have been holding up on processing // over-replicated blocks because of it. But we can now // process those blocks. - if (node.areBlockContentsStale()) { + boolean stale = false; + for(DatanodeStorageInfo storage : node.getStorageInfos()) { + if (storage.areBlockContentsStale()) { + stale = true; + break; + } + } + if (stale) { rescanPostponedMisreplicatedBlocks(); } } @@ -1015,9 +1021,8 @@ public class BlockManager { */ private void addToInvalidates(Block b) { StringBuilder datanodes = new StringBuilder(); - for (Iterator it = blocksMap.nodeIterator(b); it - .hasNext();) { - DatanodeDescriptor node = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); } @@ -1035,7 +1040,7 @@ public class BlockManager { * for logging purposes */ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, - final DatanodeInfo dn, String reason) throws IOException { + final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); if (storedBlock == null) { @@ -1048,11 +1053,11 @@ public class BlockManager { return; } markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason, - Reason.CORRUPTION_REPORTED), dn); + Reason.CORRUPTION_REPORTED), dn, storageID); } private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn) throws IOException { + DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark " + b @@ -1068,7 +1073,7 @@ public class BlockManager { } // Add replica to the data-node if it is not already there - node.addBlock(b.stored); + node.addBlock(storageID, b.stored); // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, @@ -1193,7 +1198,7 @@ public class BlockManager { @VisibleForTesting int computeReplicationWorkForBlocks(List> blocksToReplicate) { int requiredReplication, numEffectiveReplicas; - List containingNodes, liveReplicaNodes; + List containingNodes; DatanodeDescriptor srcNode; BlockCollection bc = null; int additionalReplRequired; @@ -1219,7 +1224,7 @@ public class BlockManager { // get a source data-node containingNodes = new ArrayList(); - liveReplicaNodes = new ArrayList(); + List liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( block, containingNodes, liveReplicaNodes, numReplicas, @@ -1279,7 +1284,7 @@ public class BlockManager { namesystem.writeLock(); try { for(ReplicationWork rw : work){ - DatanodeDescriptor[] targets = rw.targets; + final DatanodeStorageInfo[] targets = rw.targets; if(targets == null || targets.length == 0){ rw.targets = null; continue; @@ -1319,7 +1324,8 @@ public class BlockManager { if ( (numReplicas.liveReplicas() >= requiredReplication) && (!blockHasEnoughRacks(block)) ) { - if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) { + if (rw.srcNode.getNetworkLocation().equals( + targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case continue; } @@ -1328,15 +1334,13 @@ public class BlockManager { // Add block to the to be replicated list rw.srcNode.addBlockToBeReplicated(block, targets); scheduledWork++; - - for (DatanodeDescriptor dn : targets) { - dn.incBlocksScheduled(); - } + DatanodeStorageInfo.incrementBlocksScheduled(targets); // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, targets); + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); if(blockLog.isDebugEnabled()) { blockLog.debug( "BLOCK* block " + block @@ -1357,12 +1361,12 @@ public class BlockManager { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication for(ReplicationWork rw : work){ - DatanodeDescriptor[] targets = rw.targets; + DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); for (int k = 0; k < targets.length; k++) { targetList.append(' '); - targetList.append(targets[k]); + targetList.append(targets[k].getDatanodeDescriptor()); } blockLog.info("BLOCK* ask " + rw.srcNode + " to replicate " + rw.block + " to " + targetList); @@ -1386,15 +1390,16 @@ public class BlockManager { * @see BlockPlacementPolicy#chooseTarget(String, int, Node, * List, boolean, Set, long) */ - public DatanodeDescriptor[] chooseTarget(final String src, + public DatanodeStorageInfo[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, final Set excludedNodes, final long blocksize, List favoredNodes) throws IOException { List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); - final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src, + final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, - favoredDatanodeDescriptors); + // TODO: get storage type from file + favoredDatanodeDescriptors, StorageType.DEFAULT); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" @@ -1455,12 +1460,11 @@ public class BlockManager { * the given block */ @VisibleForTesting - DatanodeDescriptor chooseSourceDatanode( - Block block, - List containingNodes, - List nodesContainingLiveReplicas, - NumberReplicas numReplicas, - int priority) { + DatanodeDescriptor chooseSourceDatanode(Block block, + List containingNodes, + List nodesContainingLiveReplicas, + NumberReplicas numReplicas, + int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; @@ -1468,12 +1472,12 @@ public class BlockManager { int decommissioned = 0; int corrupt = 0; int excess = 0; - Iterator it = blocksMap.nodeIterator(block); + Collection nodesCorrupt = corruptReplicas.getNodes(block); - while(it.hasNext()) { - DatanodeDescriptor node = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet excessBlocks = - excessReplicateMap.get(node.getStorageID()); + excessReplicateMap.get(node.getDatanodeUuid()); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt++; else if (node.isDecommissionInProgress() || node.isDecommissioned()) @@ -1481,7 +1485,7 @@ public class BlockManager { else if (excessBlocks != null && excessBlocks.contains(block)) { excess++; } else { - nodesContainingLiveReplicas.add(node); + nodesContainingLiveReplicas.add(storage); live++; } containingNodes.add(node); @@ -1613,10 +1617,11 @@ public class BlockManager { } /** - * The given datanode is reporting all its blocks. - * Update the (machine-->blocklist) and (block-->machinelist) maps. + * The given storage is reporting all its blocks. + * Update the (storage-->block list) and (block-->storage list) maps. */ - public void processReport(final DatanodeID nodeID, final String poolId, + public void processReport(final DatanodeID nodeID, + final DatanodeStorage storage, final String poolId, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1630,26 +1635,28 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. - if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) { + final DatanodeStorageInfo storageInfo = node.updateStorage(storage); + if (namesystem.isInStartupSafeMode() + && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: " + "discarded non-initial block report from " + nodeID + " because namenode still in startup phase"); return; } - if (node.numBlocks() == 0) { + if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, newReport); + processFirstBlockReport(node, storage.getStorageID(), newReport); } else { - processReport(node, newReport); + processReport(node, storage, newReport); } // Now that we have an up-to-date block report, we know that any // deletions from a previous NN iteration have been accounted for. - boolean staleBefore = node.areBlockContentsStale(); - node.receivedBlockReport(); - if (staleBefore && !node.areBlockContentsStale()) { + boolean staleBefore = storageInfo.areBlockContentsStale(); + storageInfo.receivedBlockReport(); + if (staleBefore && !storageInfo.areBlockContentsStale()) { LOG.info("BLOCK* processReport: Received first block report from " + node + " after starting up or becoming active. Its block " + "contents are no longer considered stale"); @@ -1703,28 +1710,30 @@ public class BlockManager { } private void processReport(final DatanodeDescriptor node, + final DatanodeStorage storage, final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // Collection toAdd = new LinkedList(); - Collection toRemove = new LinkedList(); + Collection toRemove = new TreeSet(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); + reportDiff(node, storage, report, + toAdd, toRemove, toInvalidate, toCorrupt, toUC); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node); + addStoredBlockUnderConstruction(b, node, storage.getStorageID()); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1738,7 +1747,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node); + markBlockAsCorrupt(b, node, storage.getStorageID()); } } @@ -1754,10 +1763,11 @@ public class BlockManager { * @throws IOException */ private void processFirstBlockReport(final DatanodeDescriptor node, + final String storageID, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.numBlocks() == 0); + assert (node.getStorageInfo(storageID).numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -1766,7 +1776,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, iblk, reportedState, + queueReportedBlock(node, storageID, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1783,10 +1793,10 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, iblk, reportedState, + queueReportedBlock(node, storageID, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node); + markBlockAsCorrupt(c, node, storageID); } continue; } @@ -1794,7 +1804,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node, iblk, reportedState); + node.getStorageInfo(storageID), iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 @@ -1807,22 +1817,25 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node); + addStoredBlockImmediate(storedBlock, node, storageID); } } } - private void reportDiff(DatanodeDescriptor dn, + private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, BlockListAsLongs newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list + + final DatanodeStorageInfo storageInfo = dn.updateStorage(storage); + // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); - boolean added = dn.addBlock(delimiter); + boolean added = storageInfo.addBlock(delimiter); assert added : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list int curIndex; @@ -1834,20 +1847,21 @@ public class BlockManager { while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, iblk, iState, - toAdd, toInvalidate, toCorrupt, toUC); + BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); + // move block to the head of the list if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) { - headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex); + headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); } } + // collect blocks that have not been reported // all of them are next to the delimiter - Iterator it = new DatanodeDescriptor.BlockIterator( - delimiter.getNext(0), dn); + Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); while(it.hasNext()) toRemove.add(it.next()); - dn.removeBlock(delimiter); + storageInfo.removeBlock(delimiter); } /** @@ -1881,7 +1895,8 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, + private BlockInfo processReportedBlock(final DatanodeDescriptor dn, + final String storageID, final Block block, final ReplicaState reportedState, final Collection toAdd, final Collection toInvalidate, @@ -1896,7 +1911,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, block, reportedState, + queueReportedBlock(dn, storageID, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -1917,7 +1932,7 @@ public class BlockManager { } // Ignore replicas already scheduled to be removed from the DN - if(invalidateBlocks.contains(dn.getStorageID(), block)) { + if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) { /* TODO: following assertion is incorrect, see HDFS-2668 assert storedBlock.findDatanode(dn) < 0 : "Block " + block + " in recentInvalidatesSet should not appear in DN " + dn; */ @@ -1931,7 +1946,7 @@ assert storedBlock.findDatanode(dn) < 0 // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. - queueReportedBlock(dn, storedBlock, reportedState, + queueReportedBlock(dn, storageID, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -1960,7 +1975,7 @@ assert storedBlock.findDatanode(dn) < 0 * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, Block block, + private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; @@ -1970,7 +1985,7 @@ assert storedBlock.findDatanode(dn) < 0 " from datanode " + dn + " for later processing " + "because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, block, reportedState); + pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); } /** @@ -1993,8 +2008,8 @@ assert storedBlock.findDatanode(dn) < 0 if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock( - rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null); + processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2111,19 +2126,21 @@ assert storedBlock.findDatanode(dn) < 0 return false; } } - + void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, - DatanodeDescriptor node) throws IOException { + DatanodeDescriptor node, String storageID) throws IOException { BlockInfoUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState); + block.addReplicaIfNotPresent(node.getStorageInfo(storageID), + ucBlock.reportedBlock, ucBlock.reportedState); + if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, null, true); + addStoredBlock(block, node, storageID, null, true); } - } - + } + /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2134,17 +2151,17 @@ assert storedBlock.findDatanode(dn) < 0 * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node) + DatanodeDescriptor node, String storageID) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, null, false); + addStoredBlock(storedBlock, node, storageID, null, false); return; } // just add it - node.addBlock(storedBlock); + node.addBlock(storageID, storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2167,6 +2184,7 @@ assert storedBlock.findDatanode(dn) < 0 */ private Block addStoredBlock(final BlockInfo block, DatanodeDescriptor node, + String storageID, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { @@ -2192,7 +2210,7 @@ assert storedBlock.findDatanode(dn) < 0 assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storedBlock); + boolean added = node.addBlock(storageID, storedBlock); int curReplicaDelta; if (added) { @@ -2452,19 +2470,19 @@ assert storedBlock.findDatanode(dn) < 0 Collection nonExcess = new ArrayList(); Collection corruptNodes = corruptReplicas .getNodes(block); - for (Iterator it = blocksMap.nodeIterator(block); - it.hasNext();) { - DatanodeDescriptor cur = it.next(); - if (cur.areBlockContentsStale()) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); + if (storage.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + "Postponing processing of over-replicated " + - block + " since datanode " + cur + " does not yet have up-to-date " + + block + " since storage + " + storage + + "datanode " + cur + " does not yet have up-to-date " + "block information."); postponeBlock(block); return; } LightWeightLinkedSet excessBlocks = excessReplicateMap.get(cur - .getStorageID()); + .getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas @@ -2553,10 +2571,10 @@ assert storedBlock.findDatanode(dn) < 0 private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getStorageID()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); if (excessBlocks == null) { excessBlocks = new LightWeightLinkedSet(); - excessReplicateMap.put(dn.getStorageID(), excessBlocks); + excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } if (excessBlocks.add(block)) { excessBlocksCount.incrementAndGet(); @@ -2604,7 +2622,7 @@ assert storedBlock.findDatanode(dn) < 0 // in "excess" there. // LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node - .getStorageID()); + .getDatanodeUuid()); if (excessBlocks != null) { if (excessBlocks.remove(block)) { excessBlocksCount.decrementAndGet(); @@ -2613,7 +2631,7 @@ assert storedBlock.findDatanode(dn) < 0 + block + " is removed from excessBlocks"); } if (excessBlocks.size() == 0) { - excessReplicateMap.remove(node.getStorageID()); + excessReplicateMap.remove(node.getDatanodeUuid()); } } } @@ -2628,12 +2646,18 @@ assert storedBlock.findDatanode(dn) < 0 * return the length of the added block; 0 if the block is not added */ private long addBlock(Block block, List results) { - final List machineSet = getValidLocations(block); - if(machineSet.size() == 0) { + final List locations = getValidLocations(block); + if(locations.size() == 0) { return 0; } else { - results.add(new BlockWithLocations(block, - machineSet.toArray(new String[machineSet.size()]))); + final String[] datanodeUuids = new String[locations.size()]; + final String[] storageIDs = new String[datanodeUuids.length]; + for(int i = 0; i < locations.size(); i++) { + final DatanodeStorageInfo s = locations.get(i); + datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid(); + storageIDs[i] = s.getStorageID(); + } + results.add(new BlockWithLocations(block, datanodeUuids, storageIDs)); return block.getNumBytes(); } } @@ -2642,12 +2666,12 @@ assert storedBlock.findDatanode(dn) < 0 * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, Block block, String delHint) + void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) throws IOException { - // decrement number of blocks scheduled to this datanode. + // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. - node.decBlocksScheduled(); + node.decrementBlocksScheduled(); // get the deletion hint node DatanodeDescriptor delHintNode = null; @@ -2663,11 +2687,12 @@ assert storedBlock.findDatanode(dn) < 0 // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, + private void processAndHandleReportedBlock(DatanodeDescriptor node, + String storageID, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2675,7 +2700,7 @@ assert storedBlock.findDatanode(dn) < 0 Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - processReportedBlock(node, block, reportedState, + processReportedBlock(node, storageID, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2683,11 +2708,11 @@ assert storedBlock.findDatanode(dn) < 0 : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node); + addStoredBlockUnderConstruction(b, node, storageID); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2701,7 +2726,7 @@ assert storedBlock.findDatanode(dn) < 0 addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node); + markBlockAsCorrupt(b, node, storageID); } } @@ -2713,7 +2738,7 @@ assert storedBlock.findDatanode(dn) < 0 * This method must be called with FSNamesystem lock held. */ public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + final String poolId, final StorageReceivedDeletedBlocks srdb) throws IOException { assert namesystem.hasWriteLock(); int received = 0; @@ -2729,19 +2754,19 @@ assert storedBlock.findDatanode(dn) < 0 "Got incremental block report from unregistered or dead node"); } - for (ReceivedDeletedBlockInfo rdbi : blockInfos) { + for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { switch (rdbi.getStatus()) { case DELETED_BLOCK: removeStoredBlock(rdbi.getBlock(), node); deleted++; break; case RECEIVED_BLOCK: - addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); + addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, rdbi.getBlock(), + processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(), ReplicaState.RBW, null); break; default: @@ -2773,24 +2798,23 @@ assert storedBlock.findDatanode(dn) < 0 int corrupt = 0; int excess = 0; int stale = 0; - Iterator nodeIter = blocksMap.nodeIterator(b); Collection nodesCorrupt = corruptReplicas.getNodes(b); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { decommissioned++; } else { LightWeightLinkedSet blocksExcess = excessReplicateMap.get(node - .getStorageID()); + .getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { live++; } } - if (node.areBlockContentsStale()) { + if (storage.areBlockContentsStale()) { stale++; } } @@ -2813,10 +2837,9 @@ assert storedBlock.findDatanode(dn) < 0 } // else proceed with fast case int live = 0; - Iterator nodeIter = blocksMap.nodeIterator(b); Collection nodesCorrupt = corruptReplicas.getNodes(b); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) live++; } @@ -2828,10 +2851,9 @@ assert storedBlock.findDatanode(dn) < 0 int curReplicas = num.liveReplicas(); int curExpectedReplicas = getReplication(block); BlockCollection bc = blocksMap.getBlockCollection(block); - Iterator nodeIter = blocksMap.nodeIterator(block); StringBuilder nodeList = new StringBuilder(); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); nodeList.append(node); nodeList.append(" "); } @@ -2936,14 +2958,13 @@ assert storedBlock.findDatanode(dn) < 0 return blocksMap.size(); } - public DatanodeDescriptor[] getNodes(BlockInfo block) { - DatanodeDescriptor[] nodes = - new DatanodeDescriptor[block.numNodes()]; - Iterator it = blocksMap.nodeIterator(block); - for (int i = 0; it != null && it.hasNext(); i++) { - nodes[i] = it.next(); + public DatanodeStorageInfo[] getStorages(BlockInfo block) { + final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; + int i = 0; + for(DatanodeStorageInfo s : blocksMap.getStorages(block)) { + storages[i++] = s; } - return nodes; + return storages; } public int getTotalBlocks() { @@ -3056,9 +3077,8 @@ assert storedBlock.findDatanode(dn) < 0 corruptReplicas.getNodes(b); int numExpectedReplicas = getReplication(b); String rackName = null; - for (Iterator it = blocksMap.nodeIterator(b); - it.hasNext();) { - DatanodeDescriptor cur = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { if (numExpectedReplicas == 1 || @@ -3102,8 +3122,8 @@ assert storedBlock.findDatanode(dn) < 0 } /** @return an iterator of the datanodes. */ - public Iterator datanodeIterator(final Block block) { - return blocksMap.nodeIterator(block); + public Iterable getStorages(final Block block) { + return blocksMap.getStorages(block); } public int numCorruptReplicas(Block block) { @@ -3247,24 +3267,24 @@ assert storedBlock.findDatanode(dn) < 0 private DatanodeDescriptor srcNode; private List containingNodes; - private List liveReplicaNodes; + private List liveReplicaStorages; private int additionalReplRequired; - private DatanodeDescriptor targets[]; + private DatanodeStorageInfo targets[]; private int priority; public ReplicationWork(Block block, BlockCollection bc, DatanodeDescriptor srcNode, List containingNodes, - List liveReplicaNodes, + List liveReplicaStorages, int additionalReplRequired, int priority) { this.block = block; this.bc = bc; this.srcNode = srcNode; this.containingNodes = containingNodes; - this.liveReplicaNodes = liveReplicaNodes; + this.liveReplicaStorages = liveReplicaStorages; this.additionalReplRequired = additionalReplRequired; this.priority = priority; this.targets = null; @@ -3273,8 +3293,8 @@ assert storedBlock.findDatanode(dn) < 0 private void chooseTargets(BlockPlacementPolicy blockplacement, Set excludedNodes) { targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaNodes, false, - excludedNodes, block.getNumBytes()); + additionalReplRequired, srcNode, liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), StorageType.DEFAULT); } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1556076&r1=1556075&r2=1556076&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Mon Jan 6 23:58:33 2014 @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -67,13 +68,14 @@ public abstract class BlockPlacementPoli * @return array of DatanodeDescriptor instances chosen as target * and sorted as a pipeline. */ - public abstract DatanodeDescriptor[] chooseTarget(String srcPath, + public abstract DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, Node writer, - List chosenNodes, + List chosen, boolean returnChosenNodes, Set excludedNodes, - long blocksize); + long blocksize, + StorageType storageType); /** * Same as {@link #chooseTarget(String, int, Node, List, boolean, @@ -82,16 +84,19 @@ public abstract class BlockPlacementPoli * is only a hint and due to cluster state, namenode may not be * able to place the blocks on these datanodes. */ - DatanodeDescriptor[] chooseTarget(String src, + DatanodeStorageInfo[] chooseTarget(String src, int numOfReplicas, Node writer, Set excludedNodes, - long blocksize, List favoredNodes) { + long blocksize, + List favoredNodes, + StorageType storageType) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class // are expected to provide this functionality + return chooseTarget(src, numOfReplicas, writer, - new ArrayList(numOfReplicas), false, excludedNodes, - blocksize); + new ArrayList(numOfReplicas), false, + excludedNodes, blocksize, storageType); } /**