Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9056D7426 for ; Tue, 23 Aug 2011 22:04:44 +0000 (UTC) Received: (qmail 58635 invoked by uid 500); 23 Aug 2011 22:04:44 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 58596 invoked by uid 500); 23 Aug 2011 22:04:43 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 58588 invoked by uid 99); 23 Aug 2011 22:04:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Aug 2011 22:04:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Aug 2011 22:04:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 78EF023889E3; Tue, 23 Aug 2011 22:04:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1160897 - in /hadoop/common/trunk/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/ Date: Tue, 23 Aug 2011 22:04:16 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110823220416.78EF023889E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Tue Aug 23 22:04:15 2011 New Revision: 1160897 URL: http://svn.apache.org/viewvc?rev=1160897&view=rev Log: HDFS-1480. All replicas of a block can end up on the same rack when some datanodes are decommissioning. Contributed by Todd Lipcon. Added: hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/CHANGES.txt?rev=1160897&r1=1160896&r2=1160897&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs/CHANGES.txt Tue Aug 23 22:04:15 2011 @@ -989,6 +989,9 @@ Trunk (unreleased changes) HDFS-2267. DataXceiver thread name incorrect while waiting on op during keepalive. (todd) + HDFS-1480. All replicas of a block can end up on the same rack when + some datanodes are decommissioning. (todd) + BREAKDOWN OF HDFS-1073 SUBTASKS HDFS-1521. Persist transaction ID on disk between NN restarts. Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1160897&r1=1160896&r2=1160897&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 23 22:04:15 2011 @@ -66,6 +66,8 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; +import com.google.common.annotations.VisibleForTesting; + /** * Keeps information related to the blocks stored in the Hadoop cluster. * This class is a helper class for {@link FSNamesystem} and requires several @@ -147,7 +149,8 @@ public class BlockManager { // We also store pending replication-orders. // public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); - private final PendingReplicationBlocks pendingReplications; + @VisibleForTesting + final PendingReplicationBlocks pendingReplications; /** The maximum number of replicas allowed for a block */ public final short maxReplication; @@ -312,9 +315,14 @@ public class BlockManager { for (Block block : neededReplications) { List containingNodes = new ArrayList(); + List containingLiveReplicasNodes = + new ArrayList(); + NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used - chooseSourceDatanode(block, containingNodes, numReplicas); + chooseSourceDatanode(block, containingNodes, + containingLiveReplicasNodes, numReplicas); + assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -993,9 +1001,10 @@ public class BlockManager { * @param priority a hint of its priority in the neededReplication queue * @return if the block gets replicated or not */ - private boolean computeReplicationWorkForBlock(Block block, int priority) { + @VisibleForTesting + boolean computeReplicationWorkForBlock(Block block, int priority) { int requiredReplication, numEffectiveReplicas; - List containingNodes; + List containingNodes, liveReplicaNodes; DatanodeDescriptor srcNode; INodeFile fileINode = null; int additionalReplRequired; @@ -1016,11 +1025,14 @@ public class BlockManager { // get a source data-node containingNodes = new ArrayList(); + liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode(block, containingNodes, numReplicas); + srcNode = chooseSourceDatanode( + block, containingNodes, liveReplicaNodes, numReplicas); if(srcNode == null) // block can not be replicated from any node return false; + assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); @@ -1047,13 +1059,20 @@ public class BlockManager { } finally { namesystem.writeUnlock(); } + + // Exclude all of the containing nodes from being targets. + // This list includes decommissioning or corrupt nodes. + HashMap excludedNodes = new HashMap(); + for (DatanodeDescriptor dn : containingNodes) { + excludedNodes.put(dn, dn); + } // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the Inode itself. DatanodeDescriptor targets[] = blockplacement.chooseTarget(fileINode, additionalReplRequired, - srcNode, containingNodes, block.getNumBytes()); + srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes()); if(targets.length == 0) return false; @@ -1182,8 +1201,10 @@ public class BlockManager { private DatanodeDescriptor chooseSourceDatanode( Block block, List containingNodes, + List nodesContainingLiveReplicas, NumberReplicas numReplicas) { containingNodes.clear(); + nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; int live = 0; int decommissioned = 0; @@ -1202,6 +1223,7 @@ public class BlockManager { else if (excessBlocks != null && excessBlocks.contains(block)) { excess++; } else { + nodesContainingLiveReplicas.add(node); live++; } containingNodes.add(node); @@ -2049,7 +2071,8 @@ public class BlockManager { /** * The given node is reporting that it received a certain block. */ - private void addBlock(DatanodeDescriptor node, Block block, String delHint) + @VisibleForTesting + void addBlock(DatanodeDescriptor node, Block block, String delHint) throws IOException { // decrement number of blocks scheduled to this datanode. node.decBlocksScheduled(); Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1160897&r1=1160896&r2=1160897&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original) +++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Tue Aug 23 22:04:15 2011 @@ -127,9 +127,10 @@ public abstract class BlockPlacementPoli int numOfReplicas, DatanodeDescriptor writer, List chosenNodes, + HashMap excludedNodes, long blocksize) { return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer, - chosenNodes, blocksize); + chosenNodes, excludedNodes, blocksize); } /** Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1160897&r1=1160896&r2=1160897&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Aug 23 22:04:15 2011 @@ -102,16 +102,6 @@ public class BlockPlacementPolicyDefault excludedNodes, blocksize); } - /** {@inheritDoc} */ - @Override - public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - long blocksize) { - return chooseTarget(numOfReplicas, writer, chosenNodes, false, - null, blocksize); - } /** This is the implementation. */ DatanodeDescriptor[] chooseTarget(int numOfReplicas, Added: hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1160897&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (added) +++ hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Aug 23 22:04:15 2011 @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.net.NetworkTopology; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Lists; + +public class TestBlockManager { + private final List nodes = ImmutableList.of( + new DatanodeDescriptor(new DatanodeID("h1:5020"), "/rackA"), + new DatanodeDescriptor(new DatanodeID("h2:5020"), "/rackA"), + new DatanodeDescriptor(new DatanodeID("h3:5020"), "/rackA"), + new DatanodeDescriptor(new DatanodeID("h4:5020"), "/rackB"), + new DatanodeDescriptor(new DatanodeID("h5:5020"), "/rackB"), + new DatanodeDescriptor(new DatanodeID("h6:5020"), "/rackB") + ); + private final List rackA = nodes.subList(0, 3); + private final List rackB = nodes.subList(3, 6); + + /** + * Some of these tests exercise code which has some randomness involved - + * ie even if there's a bug, they may pass because the random node selection + * chooses the correct result. + * + * Since they're true unit tests and run quickly, we loop them a number + * of times trying to trigger the incorrect behavior. + */ + private static final int NUM_TEST_ITERS = 30; + + private static final int BLOCK_SIZE = 64*1024; + + private Configuration conf; + private FSNamesystem fsn; + private BlockManager bm; + + @Before + public void setupMockCluster() throws IOException { + conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + "need to set a dummy value here so it assumes a multi-rack cluster"); + fsn = Mockito.mock(FSNamesystem.class); + Mockito.doReturn(true).when(fsn).hasWriteLock(); + bm = new BlockManager(fsn, conf); + } + + private void addNodes(Iterable nodesToAdd) { + NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); + // construct network topology + for (DatanodeDescriptor dn : nodesToAdd) { + cluster.add(dn); + dn.updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + } + } + + private void removeNode(DatanodeDescriptor deadNode) { + NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); + cluster.remove(deadNode); + bm.removeBlocksAssociatedTo(deadNode); + } + + + /** + * Test that replication of under-replicated blocks is detected + * and basically works + */ + @Test + public void testBasicReplication() throws Exception { + addNodes(nodes); + for (int i = 0; i < NUM_TEST_ITERS; i++) { + doBasicTest(i); + } + } + + private void doBasicTest(int testIndex) { + List origNodes = nodes(0, 1); + BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes); + + DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo); + assertEquals(2, pipeline.length); + assertTrue("Source of replication should be one of the nodes the block " + + "was on. Was: " + pipeline[0], + origNodes.contains(pipeline[0])); + assertTrue("Destination of replication should be on the other rack. " + + "Was: " + pipeline[1], + rackB.contains(pipeline[1])); + } + + + /** + * Regression test for HDFS-1480 + * - Cluster has 2 racks, A and B, each with three nodes. + * - Block initially written on A1, A2, B1 + * - Admin decommissions two of these nodes (let's say A1 and A2 but it doesn't matter) + * - Re-replication should respect rack policy + */ + @Test + public void testTwoOfThreeNodesDecommissioned() throws Exception { + addNodes(nodes); + for (int i = 0; i < NUM_TEST_ITERS; i++) { + doTestTwoOfThreeNodesDecommissioned(i); + } + } + + private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception { + // Block originally on A1, A2, B1 + List origNodes = nodes(0, 1, 3); + BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes); + + // Decommission two of the nodes (A1, A2) + List decomNodes = startDecommission(0, 1); + + DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo); + assertTrue("Source of replication should be one of the nodes the block " + + "was on. Was: " + pipeline[0], + origNodes.contains(pipeline[0])); + assertEquals("Should have two targets", 3, pipeline.length); + + boolean foundOneOnRackA = false; + for (int i = 1; i < pipeline.length; i++) { + DatanodeDescriptor target = pipeline[i]; + if (rackA.contains(target)) { + foundOneOnRackA = true; + } + assertFalse(decomNodes.contains(target)); + assertFalse(origNodes.contains(target)); + } + + assertTrue("Should have at least one target on rack A. Pipeline: " + + Joiner.on(",").join(pipeline), + foundOneOnRackA); + } + + + /** + * Test what happens when a block is on three nodes, and all three of those + * nodes are decommissioned. It should properly re-replicate to three new + * nodes. + */ + @Test + public void testAllNodesHoldingReplicasDecommissioned() throws Exception { + addNodes(nodes); + for (int i = 0; i < NUM_TEST_ITERS; i++) { + doTestAllNodesHoldingReplicasDecommissioned(i); + } + } + + private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception { + // Block originally on A1, A2, B1 + List origNodes = nodes(0, 1, 3); + BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes); + + // Decommission all of the nodes + List decomNodes = startDecommission(0, 1, 3); + + DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo); + assertTrue("Source of replication should be one of the nodes the block " + + "was on. Was: " + pipeline[0], + origNodes.contains(pipeline[0])); + assertEquals("Should have three targets", 4, pipeline.length); + + boolean foundOneOnRackA = false; + boolean foundOneOnRackB = false; + for (int i = 1; i < pipeline.length; i++) { + DatanodeDescriptor target = pipeline[i]; + if (rackA.contains(target)) { + foundOneOnRackA = true; + } else if (rackB.contains(target)) { + foundOneOnRackB = true; + } + assertFalse(decomNodes.contains(target)); + assertFalse(origNodes.contains(target)); + } + + assertTrue("Should have at least one target on rack A. Pipeline: " + + Joiner.on(",").join(pipeline), + foundOneOnRackA); + assertTrue("Should have at least one target on rack B. Pipeline: " + + Joiner.on(",").join(pipeline), + foundOneOnRackB); + } + + /** + * Test what happens when there are two racks, and an entire rack is + * decommissioned. + * + * Since the cluster is multi-rack, it will consider the block + * under-replicated rather than create a third replica on the + * same rack. Adding a new node on a third rack should cause re-replication + * to that node. + */ + @Test + public void testOneOfTwoRacksDecommissioned() throws Exception { + addNodes(nodes); + for (int i = 0; i < NUM_TEST_ITERS; i++) { + doTestOneOfTwoRacksDecommissioned(i); + } + } + + private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception { + // Block originally on A1, A2, B1 + List origNodes = nodes(0, 1, 3); + BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes); + + // Decommission all of the nodes in rack A + List decomNodes = startDecommission(0, 1, 2); + + DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo); + assertTrue("Source of replication should be one of the nodes the block " + + "was on. Was: " + pipeline[0], + origNodes.contains(pipeline[0])); + assertEquals("Should have 2 targets", 3, pipeline.length); + + boolean foundOneOnRackB = false; + for (int i = 1; i < pipeline.length; i++) { + DatanodeDescriptor target = pipeline[i]; + if (rackB.contains(target)) { + foundOneOnRackB = true; + } + assertFalse(decomNodes.contains(target)); + assertFalse(origNodes.contains(target)); + } + + assertTrue("Should have at least one target on rack B. Pipeline: " + + Joiner.on(",").join(pipeline), + foundOneOnRackB); + + // Mark the block as received on the target nodes in the pipeline + fulfillPipeline(blockInfo, pipeline); + + // the block is still under-replicated. Add a new node. This should allow + // the third off-rack replica. + DatanodeDescriptor rackCNode = new DatanodeDescriptor(new DatanodeID("h7:5020"), "/rackC"); + addNodes(ImmutableList.of(rackCNode)); + try { + DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo); + assertEquals(2, pipeline2.length); + assertEquals(rackCNode, pipeline2[1]); + } finally { + removeNode(rackCNode); + } + } + + /** + * Unit test version of testSufficientlyReplBlocksUsesNewRack from + * {@link TestBlocksWithNotEnoughRacks}. + **/ + @Test + public void testSufficientlyReplBlocksUsesNewRack() throws Exception { + addNodes(nodes); + for (int i = 0; i < NUM_TEST_ITERS; i++) { + doTestSufficientlyReplBlocksUsesNewRack(i); + } + } + + private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) { + // Originally on only nodes in rack A. + List origNodes = rackA; + BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes); + DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo); + + assertEquals(2, pipeline.length); // single new copy + assertTrue("Source of replication should be one of the nodes the block " + + "was on. Was: " + pipeline[0], + origNodes.contains(pipeline[0])); + assertTrue("Destination of replication should be on the other rack. " + + "Was: " + pipeline[1], + rackB.contains(pipeline[1])); + } + + + /** + * Tell the block manager that replication is completed for the given + * pipeline. + */ + private void fulfillPipeline(BlockInfo blockInfo, + DatanodeDescriptor[] pipeline) throws IOException { + for (int i = 1; i < pipeline.length; i++) { + bm.addBlock(pipeline[i], blockInfo, null); + } + } + + private BlockInfo blockOnNodes(long blkId, List nodes) { + Block block = new Block(blkId); + BlockInfo blockInfo = new BlockInfo(block, 3); + + for (DatanodeDescriptor dn : nodes) { + blockInfo.addNode(dn); + } + return blockInfo; + } + + private List nodes(int ... indexes) { + List ret = Lists.newArrayList(); + for (int idx : indexes) { + ret.add(nodes.get(idx)); + } + return ret; + } + + private List startDecommission(int ... indexes) { + List nodes = nodes(indexes); + for (DatanodeDescriptor node : nodes) { + node.startDecommission(); + } + return nodes; + } + + private BlockInfo addBlockOnNodes(long blockId, List nodes) { + INodeFile iNode = Mockito.mock(INodeFile.class); + Mockito.doReturn((short)3).when(iNode).getReplication(); + BlockInfo blockInfo = blockOnNodes(blockId, nodes); + + bm.blocksMap.addINode(blockInfo, iNode); + return blockInfo; + } + + private DatanodeDescriptor[] scheduleSingleReplication(Block block) { + assertEquals("Block not initially pending replication", + 0, bm.pendingReplications.getNumReplicas(block)); + assertTrue("computeReplicationWork should indicate replication is needed", + bm.computeReplicationWorkForBlock(block, 1)); + assertTrue("replication is pending after work is computed", + bm.pendingReplications.getNumReplicas(block) > 0); + + LinkedListMultimap repls = + getAllPendingReplications(); + assertEquals(1, repls.size()); + Entry repl = repls.entries().iterator().next(); + DatanodeDescriptor[] targets = repl.getValue().targets; + + DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length]; + pipeline[0] = repl.getKey(); + System.arraycopy(targets, 0, pipeline, 1, targets.length); + + return pipeline; + } + + private LinkedListMultimap getAllPendingReplications() { + LinkedListMultimap repls = + LinkedListMultimap.create(); + for (DatanodeDescriptor dn : nodes) { + List thisRepls = dn.getReplicationCommand(10); + if (thisRepls != null) { + repls.putAll(dn, thisRepls); + } + } + return repls; + } +}