hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1160897 - in /hadoop/common/trunk/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/
Date Tue, 23 Aug 2011 22:04:16 GMT
Author: todd
Date: Tue Aug 23 22:04:15 2011
New Revision: 1160897

URL: http://svn.apache.org/viewvc?rev=1160897&view=rev
Log:
HDFS-1480. All replicas of a block can end up on the same rack when some datanodes are decommissioning.
Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
Modified:
    hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/CHANGES.txt?rev=1160897&r1=1160896&r2=1160897&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs/CHANGES.txt Tue Aug 23 22:04:15 2011
@@ -989,6 +989,9 @@ Trunk (unreleased changes)
     HDFS-2267. DataXceiver thread name incorrect while waiting on op during
     keepalive. (todd)
 
+    HDFS-1480. All replicas of a block can end up on the same rack when
+    some datanodes are decommissioning. (todd)
+
   BREAKDOWN OF HDFS-1073 SUBTASKS
 
     HDFS-1521. Persist transaction ID on disk between NN restarts.

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1160897&r1=1160896&r2=1160897&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Tue Aug 23 22:04:15 2011
@@ -66,6 +66,8 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
  * This class is a helper class for {@link FSNamesystem} and requires several
@@ -147,7 +149,8 @@ public class BlockManager {
   // We also store pending replication-orders.
   //
   public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
-  private final PendingReplicationBlocks pendingReplications;
+  @VisibleForTesting
+  final PendingReplicationBlocks pendingReplications;
 
   /** The maximum number of replicas allowed for a block */
   public final short maxReplication;
@@ -312,9 +315,14 @@ public class BlockManager {
       for (Block block : neededReplications) {
         List<DatanodeDescriptor> containingNodes =
                                           new ArrayList<DatanodeDescriptor>();
+        List<DatanodeDescriptor> containingLiveReplicasNodes =
+          new ArrayList<DatanodeDescriptor>();
+        
         NumberReplicas numReplicas = new NumberReplicas();
         // source node returned is not used
-        chooseSourceDatanode(block, containingNodes, numReplicas);
+        chooseSourceDatanode(block, containingNodes,
+            containingLiveReplicasNodes, numReplicas);
+        assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
         int usableReplicas = numReplicas.liveReplicas() +
                              numReplicas.decommissionedReplicas();
        
@@ -993,9 +1001,10 @@ public class BlockManager {
    * @param priority a hint of its priority in the neededReplication queue
    * @return if the block gets replicated or not
    */
-  private boolean computeReplicationWorkForBlock(Block block, int priority) {
+  @VisibleForTesting
+  boolean computeReplicationWorkForBlock(Block block, int priority) {
     int requiredReplication, numEffectiveReplicas;
-    List<DatanodeDescriptor> containingNodes;
+    List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
     DatanodeDescriptor srcNode;
     INodeFile fileINode = null;
     int additionalReplRequired;
@@ -1016,11 +1025,14 @@ public class BlockManager {
 
         // get a source data-node
         containingNodes = new ArrayList<DatanodeDescriptor>();
+        liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
         NumberReplicas numReplicas = new NumberReplicas();
-        srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
+        srcNode = chooseSourceDatanode(
+            block, containingNodes, liveReplicaNodes, numReplicas);
         if(srcNode == null) // block can not be replicated from any node
           return false;
 
+        assert liveReplicaNodes.size() == numReplicas.liveReplicas();
         // do not schedule more if enough replicas is already pending
         numEffectiveReplicas = numReplicas.liveReplicas() +
                                 pendingReplications.getNumReplicas(block);
@@ -1047,13 +1059,20 @@ public class BlockManager {
     } finally {
       namesystem.writeUnlock();
     }
+    
+    // Exclude all of the containing nodes from being targets.
+    // This list includes decommissioning or corrupt nodes.
+    HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
+    for (DatanodeDescriptor dn : containingNodes) {
+      excludedNodes.put(dn, dn);
+    }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
     // It is costly to extract the filename for which chooseTargets is called,
     // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
                        blockplacement.chooseTarget(fileINode, additionalReplRequired,
-                       srcNode, containingNodes, block.getNumBytes());
+                       srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -1182,8 +1201,10 @@ public class BlockManager {
   private DatanodeDescriptor chooseSourceDatanode(
                                     Block block,
                                     List<DatanodeDescriptor> containingNodes,
+                                    List<DatanodeDescriptor> nodesContainingLiveReplicas,
                                     NumberReplicas numReplicas) {
     containingNodes.clear();
+    nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
     int live = 0;
     int decommissioned = 0;
@@ -1202,6 +1223,7 @@ public class BlockManager {
       else if (excessBlocks != null && excessBlocks.contains(block)) {
         excess++;
       } else {
+        nodesContainingLiveReplicas.add(node);
         live++;
       }
       containingNodes.add(node);
@@ -2049,7 +2071,8 @@ public class BlockManager {
   /**
    * The given node is reporting that it received a certain block.
    */
-  private void addBlock(DatanodeDescriptor node, Block block, String delHint)
+  @VisibleForTesting
+  void addBlock(DatanodeDescriptor node, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
     node.decBlocksScheduled();

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1160897&r1=1160896&r2=1160897&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
Tue Aug 23 22:04:15 2011
@@ -127,9 +127,10 @@ public abstract class BlockPlacementPoli
                                     int numOfReplicas,
                                     DatanodeDescriptor writer,
                                     List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
                                     long blocksize) {
     return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
-                        chosenNodes, blocksize);
+                        chosenNodes, excludedNodes, blocksize);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1160897&r1=1160896&r2=1160897&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
Tue Aug 23 22:04:15 2011
@@ -102,16 +102,6 @@ public class BlockPlacementPolicyDefault
         excludedNodes, blocksize);
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    long blocksize) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, false,
-        null, blocksize);
-  }
 
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,

Added: hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1160897&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
Tue Aug 23 22:04:15 2011
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.net.NetworkTopology;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+
+public class TestBlockManager {
+  private final List<DatanodeDescriptor> nodes = ImmutableList.of( 
+      new DatanodeDescriptor(new DatanodeID("h1:5020"), "/rackA"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020"), "/rackA"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020"), "/rackA"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020"), "/rackB"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020"), "/rackB"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020"), "/rackB")
+    );
+  private final List<DatanodeDescriptor> rackA = nodes.subList(0, 3);
+  private final List<DatanodeDescriptor> rackB = nodes.subList(3, 6);
+  
+  /**
+   * Some of these tests exercise code which has some randomness involved -
+   * ie even if there's a bug, they may pass because the random node selection
+   * chooses the correct result.
+   * 
+   * Since they're true unit tests and run quickly, we loop them a number
+   * of times trying to trigger the incorrect behavior.
+   */
+  private static final int NUM_TEST_ITERS = 30;
+  
+  private static final int BLOCK_SIZE = 64*1024;
+  
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  @Before
+  public void setupMockCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
+        "need to set a dummy value here so it assumes a multi-rack cluster");
+    fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.doReturn(true).when(fsn).hasWriteLock();
+    bm = new BlockManager(fsn, conf);
+  }
+  
+  private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
+    NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
+    // construct network topology
+    for (DatanodeDescriptor dn : nodesToAdd) {
+      cluster.add(dn);
+      dn.updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+  }
+
+  private void removeNode(DatanodeDescriptor deadNode) {
+    NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
+    cluster.remove(deadNode);
+    bm.removeBlocksAssociatedTo(deadNode);
+  }
+
+
+  /**
+   * Test that replication of under-replicated blocks is detected
+   * and basically works
+   */
+  @Test
+  public void testBasicReplication() throws Exception {
+    addNodes(nodes);
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doBasicTest(i);
+    }
+  }
+  
+  private void doBasicTest(int testIndex) {
+    List<DatanodeDescriptor> origNodes = nodes(0, 1);
+    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
+
+    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    assertEquals(2, pipeline.length);
+    assertTrue("Source of replication should be one of the nodes the block " +
+        "was on. Was: " + pipeline[0],
+        origNodes.contains(pipeline[0]));
+    assertTrue("Destination of replication should be on the other rack. " +
+        "Was: " + pipeline[1],
+        rackB.contains(pipeline[1]));
+  }
+  
+
+  /**
+   * Regression test for HDFS-1480
+   * - Cluster has 2 racks, A and B, each with three nodes.
+   * - Block initially written on A1, A2, B1
+   * - Admin decommissions two of these nodes (let's say A1 and A2 but it doesn't matter)
+   * - Re-replication should respect rack policy
+   */
+  @Test
+  public void testTwoOfThreeNodesDecommissioned() throws Exception {
+    addNodes(nodes);
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestTwoOfThreeNodesDecommissioned(i);
+    }
+  }
+  
+  private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
+    // Block originally on A1, A2, B1
+    List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
+    
+    // Decommission two of the nodes (A1, A2)
+    List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
+    
+    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    assertTrue("Source of replication should be one of the nodes the block " +
+        "was on. Was: " + pipeline[0],
+        origNodes.contains(pipeline[0]));
+    assertEquals("Should have two targets", 3, pipeline.length);
+    
+    boolean foundOneOnRackA = false;
+    for (int i = 1; i < pipeline.length; i++) {
+      DatanodeDescriptor target = pipeline[i];
+      if (rackA.contains(target)) {
+        foundOneOnRackA = true;
+      }
+      assertFalse(decomNodes.contains(target));
+      assertFalse(origNodes.contains(target));
+    }
+    
+    assertTrue("Should have at least one target on rack A. Pipeline: " +
+        Joiner.on(",").join(pipeline),
+        foundOneOnRackA);
+  }
+  
+
+  /**
+   * Test what happens when a block is on three nodes, and all three of those
+   * nodes are decommissioned. It should properly re-replicate to three new
+   * nodes. 
+   */
+  @Test
+  public void testAllNodesHoldingReplicasDecommissioned() throws Exception {
+    addNodes(nodes);
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestAllNodesHoldingReplicasDecommissioned(i);
+    }
+  }
+
+  private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception
{
+    // Block originally on A1, A2, B1
+    List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
+    
+    // Decommission all of the nodes
+    List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
+    
+    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    assertTrue("Source of replication should be one of the nodes the block " +
+        "was on. Was: " + pipeline[0],
+        origNodes.contains(pipeline[0]));
+    assertEquals("Should have three targets", 4, pipeline.length);
+    
+    boolean foundOneOnRackA = false;
+    boolean foundOneOnRackB = false;
+    for (int i = 1; i < pipeline.length; i++) {
+      DatanodeDescriptor target = pipeline[i];
+      if (rackA.contains(target)) {
+        foundOneOnRackA = true;
+      } else if (rackB.contains(target)) {
+        foundOneOnRackB = true;
+      }
+      assertFalse(decomNodes.contains(target));
+      assertFalse(origNodes.contains(target));
+    }
+    
+    assertTrue("Should have at least one target on rack A. Pipeline: " +
+        Joiner.on(",").join(pipeline),
+        foundOneOnRackA);
+    assertTrue("Should have at least one target on rack B. Pipeline: " +
+        Joiner.on(",").join(pipeline),
+        foundOneOnRackB);
+  }
+
+  /**
+   * Test what happens when there are two racks, and an entire rack is
+   * decommissioned.
+   * 
+   * Since the cluster is multi-rack, it will consider the block
+   * under-replicated rather than create a third replica on the
+   * same rack. Adding a new node on a third rack should cause re-replication
+   * to that node.
+   */
+  @Test
+  public void testOneOfTwoRacksDecommissioned() throws Exception {
+    addNodes(nodes);
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestOneOfTwoRacksDecommissioned(i);
+    }
+  }
+  
+  private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
+    // Block originally on A1, A2, B1
+    List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
+    
+    // Decommission all of the nodes in rack A
+    List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
+    
+    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    assertTrue("Source of replication should be one of the nodes the block " +
+        "was on. Was: " + pipeline[0],
+        origNodes.contains(pipeline[0]));
+    assertEquals("Should have 2 targets", 3, pipeline.length);
+    
+    boolean foundOneOnRackB = false;
+    for (int i = 1; i < pipeline.length; i++) {
+      DatanodeDescriptor target = pipeline[i];
+      if (rackB.contains(target)) {
+        foundOneOnRackB = true;
+      }
+      assertFalse(decomNodes.contains(target));
+      assertFalse(origNodes.contains(target));
+    }
+    
+    assertTrue("Should have at least one target on rack B. Pipeline: " +
+        Joiner.on(",").join(pipeline),
+        foundOneOnRackB);
+    
+    // Mark the block as received on the target nodes in the pipeline
+    fulfillPipeline(blockInfo, pipeline);
+
+    // the block is still under-replicated. Add a new node. This should allow
+    // the third off-rack replica.
+    DatanodeDescriptor rackCNode = new DatanodeDescriptor(new DatanodeID("h7:5020"), "/rackC");
+    addNodes(ImmutableList.of(rackCNode));
+    try {
+      DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
+      assertEquals(2, pipeline2.length);
+      assertEquals(rackCNode, pipeline2[1]);
+    } finally {
+      removeNode(rackCNode);
+    }
+  }
+
+  /**
+   * Unit test version of testSufficientlyReplBlocksUsesNewRack from
+   * {@link TestBlocksWithNotEnoughRacks}.
+   **/
+  @Test
+  public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
+    addNodes(nodes);
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestSufficientlyReplBlocksUsesNewRack(i);
+    }
+  }
+
+  private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
+    // Originally on only nodes in rack A.
+    List<DatanodeDescriptor> origNodes = rackA;
+    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
+    DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
+    
+    assertEquals(2, pipeline.length); // single new copy
+    assertTrue("Source of replication should be one of the nodes the block " +
+        "was on. Was: " + pipeline[0],
+        origNodes.contains(pipeline[0]));
+    assertTrue("Destination of replication should be on the other rack. " +
+        "Was: " + pipeline[1],
+        rackB.contains(pipeline[1]));
+  }
+  
+  
+  /**
+   * Tell the block manager that replication is completed for the given
+   * pipeline.
+   */
+  private void fulfillPipeline(BlockInfo blockInfo,
+      DatanodeDescriptor[] pipeline) throws IOException {
+    for (int i = 1; i < pipeline.length; i++) {
+      bm.addBlock(pipeline[i], blockInfo, null);
+    }
+  }
+
+  private BlockInfo blockOnNodes(long blkId, List<DatanodeDescriptor> nodes) {
+    Block block = new Block(blkId);
+    BlockInfo blockInfo = new BlockInfo(block, 3);
+
+    for (DatanodeDescriptor dn : nodes) {
+      blockInfo.addNode(dn);
+    }
+    return blockInfo;
+  }
+
+  private List<DatanodeDescriptor> nodes(int ... indexes) {
+    List<DatanodeDescriptor> ret = Lists.newArrayList();
+    for (int idx : indexes) {
+      ret.add(nodes.get(idx));
+    }
+    return ret;
+  }
+  
+  private List<DatanodeDescriptor> startDecommission(int ... indexes) {
+    List<DatanodeDescriptor> nodes = nodes(indexes);
+    for (DatanodeDescriptor node : nodes) {
+      node.startDecommission();
+    }
+    return nodes;
+  }
+  
+  private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
+    INodeFile iNode = Mockito.mock(INodeFile.class);
+    Mockito.doReturn((short)3).when(iNode).getReplication();
+    BlockInfo blockInfo = blockOnNodes(blockId, nodes);
+
+    bm.blocksMap.addINode(blockInfo, iNode);
+    return blockInfo;
+  }
+  
+  private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
+    assertEquals("Block not initially pending replication",
+        0, bm.pendingReplications.getNumReplicas(block));
+    assertTrue("computeReplicationWork should indicate replication is needed",
+        bm.computeReplicationWorkForBlock(block, 1));
+    assertTrue("replication is pending after work is computed",
+        bm.pendingReplications.getNumReplicas(block) > 0);
+    
+    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
+      getAllPendingReplications();
+    assertEquals(1, repls.size());
+    Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next();
+    DatanodeDescriptor[] targets = repl.getValue().targets;
+    
+    DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
+    pipeline[0] = repl.getKey();
+    System.arraycopy(targets, 0, pipeline, 1, targets.length);
+    
+    return pipeline;
+  }
+
+  private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications()
{
+    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
+      LinkedListMultimap.create();
+    for (DatanodeDescriptor dn : nodes) {
+      List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
+      if (thisRepls != null) {
+        repls.putAll(dn, thisRepls);
+      }
+    }
+    return repls;
+  }
+}



Mime
View raw message