Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F38E318F60 for ; Fri, 14 Aug 2015 10:54:29 +0000 (UTC) Received: (qmail 95902 invoked by uid 500); 14 Aug 2015 10:54:17 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95650 invoked by uid 500); 14 Aug 2015 10:54:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 93927 invoked by uid 99); 14 Aug 2015 10:54:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 10:54:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05976E35E6; Fri, 14 Aug 2015 10:54:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinayakumarb@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 10:54:41 -0000 Message-Id: <2a212e70884848fbbb9532b9aace7171@git.apache.org> In-Reply-To: <0e8188f9427546bfa5c0a42a5f7f5505@git.apache.org> References: <0e8188f9427546bfa5c0a42a5f7f5505@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] hadoop git commit: HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for striped block. Contributed by Walter Su. HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for striped block. Contributed by Walter Su. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4bed451b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4bed451b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4bed451b Branch: refs/heads/HDFS-7285-REBASE Commit: 4bed451bcd1bbc71ab382db4e8cda44884c6cda7 Parents: 904bd7d Author: Jing Zhao Authored: Mon Jul 6 16:39:47 2015 -0700 Committer: Vinayakumar B Committed: Thu Aug 13 17:17:13 2015 +0530 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../blockmanagement/UnderReplicatedBlocks.java | 51 ++++++- .../blockmanagement/BlockManagerTestUtil.java | 8 + .../TestUnderReplicatedBlockQueues.java | 62 ++++++++ .../namenode/TestRecoverStripedBlocks.java | 151 ++++++++++++------- 5 files changed, 214 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bed451b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 8f720fc..58b91b6 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -329,3 +329,6 @@ HDFS-8684. Erasure Coding: fix some block number calculation for striped block. (yliu) + + HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for + striped block. (Walter Su via jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bed451b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index ebc15b8..7e8f479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * *

* The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(int, int, int)}. + * is implemented in {@link #getPriority(BlockInfo, int, int, int)}. *

*

The queue order is as follows:

*
    @@ -145,14 +145,28 @@ class UnderReplicatedBlocks implements Iterable { * @param expectedReplicas expected number of replicas of the block * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) */ - private int getPriority(int curReplicas, + private int getPriority(BlockInfo block, + int curReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; if (curReplicas >= expectedReplicas) { // Block has enough copies, but not enough racks return QUEUE_REPLICAS_BADLY_DISTRIBUTED; - } else if (curReplicas == 0) { + } + if (block.isStriped()) { + BlockInfoStriped sblk = (BlockInfoStriped) block; + return getPriorityStriped(curReplicas, decommissionedReplicas, + sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); + } else { + return getPriorityContiguous(curReplicas, decommissionedReplicas, + expectedReplicas); + } + } + + private int getPriorityContiguous(int curReplicas, int decommissionedReplicas, + int expectedReplicas) { + if (curReplicas == 0) { // If there are zero non-decommissioned replicas but there are // some decommissioned replicas, then assign them highest priority if (decommissionedReplicas > 0) { @@ -161,7 +175,7 @@ class UnderReplicatedBlocks implements Iterable { //all we have are corrupt blocks return QUEUE_WITH_CORRUPT_BLOCKS; } else if (curReplicas == 1) { - //only on replica -risk of loss + // only one replica, highest risk of loss // highest priority return QUEUE_HIGHEST_PRIORITY; } else if ((curReplicas * 3) < expectedReplicas) { @@ -174,6 +188,27 @@ class UnderReplicatedBlocks implements Iterable { } } + private int getPriorityStriped(int curReplicas, int decommissionedReplicas, + short dataBlkNum, short parityBlkNum) { + if (curReplicas < dataBlkNum) { + // There are some replicas on decommissioned nodes so it's not corrupted + if (curReplicas + decommissionedReplicas >= dataBlkNum) { + return QUEUE_HIGHEST_PRIORITY; + } + return QUEUE_WITH_CORRUPT_BLOCKS; + } else if (curReplicas == dataBlkNum) { + // highest risk of loss, highest priority + return QUEUE_HIGHEST_PRIORITY; + } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) { + // can only afford one replica loss + // this is considered very under-replicated + return QUEUE_VERY_UNDER_REPLICATED; + } else { + // add to the normal queue for under replicated blocks + return QUEUE_UNDER_REPLICATED; + } + } + /** add a block to a under replication queue according to its priority * @param block a under replication block * @param curReplicas current number of replicas of the block @@ -186,7 +221,7 @@ class UnderReplicatedBlocks implements Iterable { int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(curReplicas, decomissionedReplicas, + int priLevel = getPriority(block, curReplicas, decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && @@ -209,7 +244,7 @@ class UnderReplicatedBlocks implements Iterable { int oldReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(oldReplicas, + int priLevel = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); @@ -283,9 +318,9 @@ class UnderReplicatedBlocks implements Iterable { int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(curReplicas, decommissionedReplicas, + int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); - int oldPri = getPriority(oldReplicas, decommissionedReplicas, + int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bed451b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index e25ee31..64d80bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -306,4 +306,12 @@ public class BlockManagerTestUtil { throws ExecutionException, InterruptedException { dm.getDecomManager().runMonitor(); } + + /** + * add block to the replicateBlocks queue of the Datanode + */ + public static void addBlockToBeReplicated(DatanodeDescriptor node, + Block block, DatanodeStorageInfo[] targets) { + node.addBlockToBeReplicated(block, targets); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bed451b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index de36e07..0f419ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -28,10 +31,21 @@ import static org.junit.Assert.fail; public class TestUnderReplicatedBlockQueues { + private final ECSchema ecSchema = + ErasureCodingSchemaManager.getSystemDefaultSchema(); + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private BlockInfo genBlockInfo(long id) { return new BlockInfoContiguous(new Block(id), (short) 3); } + private BlockInfo genStripedBlockInfo(long id, long numBytes) { + BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecSchema, + CELLSIZE); + sblk.setNumBytes(numBytes); + return sblk; + } + /** * Test that adding blocks with different replication counts puts them * into different queues @@ -85,6 +99,54 @@ public class TestUnderReplicatedBlockQueues { assertEquals(2, queues.getCorruptReplOneBlockSize()); } + @Test + public void testStripedBlockPriorities() throws Throwable { + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNUm = ecSchema.getNumParityUnits(); + doTestStripedBlockPriorities(1, parityBlkNUm); + doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm); + } + + private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum) + throws Throwable { + int groupSize = dataBlkNum + parityBlkNum; + long numBytes = CELLSIZE * dataBlkNum; + UnderReplicatedBlocks queues = new UnderReplicatedBlocks(); + + // add a striped block which been left NUM_DATA_BLOCKS internal blocks + BlockInfo block1 = genStripedBlockInfo(-100, numBytes); + assertAdded(queues, block1, dataBlkNum, 0, groupSize); + assertEquals(1, queues.getUnderReplicatedBlockCount()); + assertEquals(1, queues.size()); + assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY); + + // add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks + BlockInfo block2 = genStripedBlockInfo(-200, numBytes); + assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize); + assertEquals(2, queues.getUnderReplicatedBlockCount()); + assertEquals(2, queues.size()); + assertInLevel(queues, block2, + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED); + + // add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks + BlockInfo block3 = genStripedBlockInfo(-300, numBytes); + assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize); + assertEquals(3, queues.getUnderReplicatedBlockCount()); + assertEquals(3, queues.size()); + assertInLevel(queues, block3, + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED); + + // add a corrupted block + BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes); + assertEquals(0, queues.getCorruptBlockSize()); + assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize); + assertEquals(4, queues.size()); + assertEquals(3, queues.getUnderReplicatedBlockCount()); + assertEquals(1, queues.getCorruptBlockSize()); + assertInLevel(queues, block_corrupt, + UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + } + private void assertAdded(UnderReplicatedBlocks queues, BlockInfo block, int curReplicas, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bed451b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index ca4fbbc..3134373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -32,29 +32,26 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; -import org.junit.After; -import org.junit.Before; import org.junit.Test; - -import java.io.IOException; import java.util.List; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestRecoverStripedBlocks { private final short GROUP_SIZE = - NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; private MiniDFSCluster cluster; private final Path dirPath = new Path("/dir"); private Path filePath = new Path(dirPath, "file"); + private int maxReplicationStreams = + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT; - @Before - public void setup() throws IOException { - final Configuration conf = new HdfsConfiguration(); + private void initConf(Configuration conf) { // Large value to make sure the pending replication request can stay in // DatanodeDescriptor.replicateBlocks before test timeout. conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); @@ -62,63 +59,111 @@ public class TestRecoverStripedBlocks { // chooseUnderReplicatedBlocks at once. conf.setInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + } - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1) - .build(); - cluster.waitActive(); + @Test + public void testMissingStripedBlock() throws Exception { + doTestMissingStripedBlock(1, 0); } - @After - public void tearDown() throws Exception { - if (cluster != null) { - cluster.shutdown(); - } + @Test + public void testMissingStripedBlockWithBusyNode1() throws Exception { + doTestMissingStripedBlock(2, 1); } @Test - public void testMissingStripedBlock() throws Exception { - final int numBlocks = 4; - DFSTestUtil.createStripedFile(cluster, filePath, - dirPath, numBlocks, 1, true); + public void testMissingStripedBlockWithBusyNode2() throws Exception { + doTestMissingStripedBlock(3, 1); + } - // make sure the file is complete in NN - final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() - .getINode4Write(filePath.toString()).asFile(); - assertFalse(fileNode.isUnderConstruction()); - assertTrue(fileNode.isStriped()); - BlockInfo[] blocks = fileNode.getBlocks(); - assertEquals(numBlocks, blocks.length); - for (BlockInfo blk : blocks) { - assertTrue(blk.isStriped()); - assertTrue(blk.isComplete()); - assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes()); - final BlockInfoStriped sb = (BlockInfoStriped) blk; - assertEquals(GROUP_SIZE, sb.numNodes()); - } + /** + * Start GROUP_SIZE + 1 datanodes. + * Inject striped blocks to first GROUP_SIZE datanodes. + * Then make numOfBusy datanodes busy, make numOfMissed datanodes missed. + * Then trigger BlockManager to compute recovery works. (so all recovery work + * will be scheduled to the last datanode) + * Finally, verify the recovery work of the last datanode. + */ + private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy) + throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1) + .build(); + + try { + cluster.waitActive(); + final int numBlocks = 4; + DFSTestUtil.createStripedFile(cluster, filePath, + dirPath, numBlocks, 1, true); + // all blocks will be located at first GROUP_SIZE DNs, the last DN is + // empty because of the util function createStripedFile - final BlockManager bm = cluster.getNamesystem().getBlockManager(); - BlockInfo firstBlock = fileNode.getBlocks()[0]; - DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock); + // make sure the file is complete in NN + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(filePath.toString()).asFile(); + assertFalse(fileNode.isUnderConstruction()); + assertTrue(fileNode.isStriped()); + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(numBlocks, blocks.length); + for (BlockInfo blk : blocks) { + assertTrue(blk.isStriped()); + assertTrue(blk.isComplete()); + assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, + blk.getNumBytes()); + final BlockInfoStriped sb = (BlockInfoStriped) blk; + assertEquals(GROUP_SIZE, sb.numNodes()); + } - DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor(); - assertEquals(numBlocks, secondDn.numBlocks()); + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock); - bm.getDatanodeManager().removeDatanode(secondDn); + // make numOfBusy nodes busy + int i = 0; + for (; i < numOfBusy; i++) { + DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor(); + for (int j = 0; j < maxReplicationStreams + 1; j++) { + BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j), + new DatanodeStorageInfo[]{storageInfos[0]}); + } + } - BlockManagerTestUtil.getComputedDatanodeWork(bm); + // make numOfMissed internal blocks missed + for (; i < numOfBusy + numOfMissed; i++) { + DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor(); + assertEquals(numBlocks, missedNode.numBlocks()); + bm.getDatanodeManager().removeDatanode(missedNode); + } - // all the recovery work will be scheduled on the last DN - DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE); - DatanodeDescriptor last = + BlockManagerTestUtil.getComputedDatanodeWork(bm); + + // all the recovery work will be scheduled on the last DN + DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE); + DatanodeDescriptor last = bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId()); - assertEquals("Counting the number of outstanding EC tasks", numBlocks, - last.getNumberOfBlocksToBeErasureCoded()); - List recovery = last.getErasureCodeCommand(numBlocks); - for (BlockECRecoveryInfo info : recovery) { - assertEquals(1, info.getTargetDnInfos().length); - assertEquals(last, info.getTargetDnInfos()[0]); - assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length); - assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length); + assertEquals("Counting the number of outstanding EC tasks", numBlocks, + last.getNumberOfBlocksToBeErasureCoded()); + List recovery = + last.getErasureCodeCommand(numBlocks); + for (BlockECRecoveryInfo info : recovery) { + assertEquals(1, info.getTargetDnInfos().length); + assertEquals(last, info.getTargetDnInfos()[0]); + assertEquals(info.getSourceDnInfos().length, + info.getLiveBlockIndices().length); + if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) { + // It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen + // to make sure we have NUM_DATA_BLOCKS DNs to do recovery work. + assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length); + } else { + // The block has no highest priority, so we don't use the busy DNs as + // sources + assertEquals(GROUP_SIZE - numOfMissed - numOfBusy, + info.getSourceDnInfos().length); + } + } + } finally { + cluster.shutdown(); } } }