hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1476400 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/resources/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/ ...
Date Fri, 26 Apr 2013 20:52:30 GMT
Author: szetszwo
Date: Fri Apr 26 20:52:29 2013
New Revision: 1476400

URL: http://svn.apache.org/r1476400
Log:
svn merge -c 1476399 from trunk for HDFS-4721. Speed up lease recovery by avoiding stale datanodes
and choosing he datanode with the most recent heartbeat as the primary.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
      - copied unchanged from r1476399, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1476399

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Apr 26
20:52:29 2013
@@ -94,6 +94,10 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4346. Add SequentialNumber as a base class for INodeId and
     GenerationStamp.  (szetszwo)
 
+    HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing
+    the datanode with the most recent heartbeat as the primary.  (Varun Sharma
+    via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1476399

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
Fri Apr 26 20:52:29 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction 
    */
   private List<ReplicaUnderConstruction> replicas;
 
-  /** A data-node responsible for block recovery. */
+  /**
+   * Index of the primary data node doing the recovery. Useful for log
+   * messages.
+   */
   private int primaryNodeIndex = -1;
 
   /**
@@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction 
   static class ReplicaUnderConstruction extends Block {
     private DatanodeDescriptor expectedLocation;
     private ReplicaState state;
+    private boolean chosenAsPrimary;
 
     ReplicaUnderConstruction(Block block,
                              DatanodeDescriptor target,
@@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction 
       super(block);
       this.expectedLocation = target;
       this.state = state;
+      this.chosenAsPrimary = false;
     }
 
     /**
@@ -89,6 +95,13 @@ public class BlockInfoUnderConstruction 
     }
 
     /**
+     * Whether the replica was chosen for recovery.
+     */
+    boolean getChosenAsPrimary() {
+      return chosenAsPrimary;
+    }
+
+    /**
      * Set replica state.
      */
     void setState(ReplicaState s) {
@@ -96,6 +109,13 @@ public class BlockInfoUnderConstruction 
     }
 
     /**
+     * Set whether this replica was chosen for recovery.
+     */
+    void setChosenAsPrimary(boolean chosenAsPrimary) {
+      this.chosenAsPrimary = chosenAsPrimary;
+    }
+
+    /**
      * Is data-node the replica belongs to alive.
      */
     boolean isAlive() {
@@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction 
         + " BlockInfoUnderConstruction.initLeaseRecovery:"
         + " No blocks found, lease removed.");
     }
-
-    int previous = primaryNodeIndex;
-    for(int i = 1; i <= replicas.size(); i++) {
-      int j = (previous + i)%replicas.size();
-      if (replicas.get(j).isAlive()) {
-        primaryNodeIndex = j;
-        DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
-        primary.addBlockToBeRecovered(this);
-        NameNode.blockStateChangeLog.info("BLOCK* " + this
-          + " recovery started, primary=" + primary);
-        return;
+    boolean allLiveReplicasTriedAsPrimary = true;
+    for (int i = 0; i < replicas.size(); i++) {
+      // Check if all replicas have been tried or not.
+      if (replicas.get(i).isAlive()) {
+        allLiveReplicasTriedAsPrimary =
+            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
+      }
+    }
+    if (allLiveReplicasTriedAsPrimary) {
+      // Just set all the replicas to be chosen whether they are alive or not.
+      for (int i = 0; i < replicas.size(); i++) {
+        replicas.get(i).setChosenAsPrimary(false);
       }
     }
+    long mostRecentLastUpdate = 0;
+    ReplicaUnderConstruction primary = null;
+    primaryNodeIndex = -1;
+    for(int i = 0; i < replicas.size(); i++) {
+      // Skip alive replicas which have been chosen for recovery.
+      if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary()))
{
+        continue;
+      }
+      if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate)
{
+        primary = replicas.get(i);
+        primaryNodeIndex = i;
+        mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
+      }
+    }
+    if (primary != null) {
+      primary.getExpectedLocation().addBlockToBeRecovered(this);
+      primary.setChosenAsPrimary(true);
+      NameNode.blockStateChangeLog.info("BLOCK* " + this
+        + " recovery started, primary=" + primary);
+    }
   }
 
   void addReplicaIfNotPresent(DatanodeDescriptor dn,

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
Fri Apr 26 20:52:29 2013
@@ -209,7 +209,7 @@ public class DatanodeManager {
         " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
         "It should be a positive non-zero float value, not greater than 1.0f.");
   }
-  
+
   private static long getStaleIntervalFromConf(Configuration conf,
       long heartbeatExpireInterval) {
     long staleInterval = conf.getLong(
@@ -862,7 +862,7 @@ public class DatanodeManager {
         (numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
             * ratioUseStaleDataNodesForWrite);
   }
-  
+
   /**
    * @return The time interval used to mark DataNodes as stale.
    */
@@ -1080,7 +1080,7 @@ public class DatanodeManager {
    * failed.  As a special case, the loopback address is also considered
    * acceptable.  This is particularly important on Windows, where 127.0.0.1 does
    * not resolve to "localhost".
-   * 
+   *
    * @param address InetAddress to check
    * @return boolean true if name resolution successful or address is loopback
    */
@@ -1114,7 +1114,7 @@ public class DatanodeManager {
           setDatanodeDead(nodeinfo);
           throw new DisallowedDatanodeException(nodeinfo);
         }
-         
+
         if (nodeinfo == null || !nodeinfo.isAlive) {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
@@ -1129,9 +1129,34 @@ public class DatanodeManager {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
           for (BlockInfoUnderConstruction b : blocks) {
-            brCommand.add(new RecoveringBlock(
-                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
-                    .getBlockRecoveryId()));
+            DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+            // Skip stale nodes during recovery - not heart beated for some time (30s by
default).
+            List<DatanodeDescriptor> recoveryLocations =
+                new ArrayList<DatanodeDescriptor>(expectedLocations.length);
+            for (int i = 0; i < expectedLocations.length; i++) {
+              if (!expectedLocations[i].isStale(this.staleInterval)) {
+                recoveryLocations.add(expectedLocations[i]);
+              }
+            }
+            // If we only get 1 replica after eliminating stale nodes, then choose all
+            // replicas for recovery and let the primary data node handle failures.
+            if (recoveryLocations.size() > 1) {
+              if (recoveryLocations.size() != expectedLocations.length) {
+                LOG.info("Skipped stale nodes for recovery : " +
+                    (expectedLocations.length - recoveryLocations.size()));
+              }
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+                  b.getBlockRecoveryId()));
+            } else {
+              // If too many replicas are stale, then choose all replicas to participate
+              // in block recovery.
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  expectedLocations,
+                  b.getBlockRecoveryId()));
+            }
           }
           return new DatanodeCommand[] { brCommand };
         }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Fri Apr 26 20:52:29 2013
@@ -1072,7 +1072,10 @@
     otherwise this may cause too frequent change of stale states. 
     We thus set a minimum stale interval value (the default value is 3 times 
     of heartbeat interval) and guarantee that the stale interval cannot be less
-    than the minimum value.
+    than the minimum value. A stale data node is avoided during lease/block
+    recovery. It can be conditionally avoided for reads (see
+    dfs.namenode.avoid.read.stale.datanode) and for writes (see
+    dfs.namenode.avoid.write.stale.datanode).
   </description>
 </property>
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
Fri Apr 26 20:52:29 2013
@@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -56,14 +60,12 @@ public class TestHeartbeatHandling {
       final HeartbeatManager hm = namesystem.getBlockManager(
           ).getDatanodeManager().getHeartbeatManager();
       final String poolId = namesystem.getBlockPoolId();
-      final DatanodeRegistration nodeReg = 
+      final DatanodeRegistration nodeReg =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
-
-
       final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
-      
+
       final int REMAINING_BLOCKS = 1;
-      final int MAX_REPLICATE_LIMIT = 
+      final int MAX_REPLICATE_LIMIT =
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
       final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
@@ -83,7 +85,7 @@ public class TestHeartbeatHandling {
           assertEquals(1, cmds.length);
           assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
           assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
-          
+
           ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
           for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
             blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
@@ -122,4 +124,113 @@ public class TestHeartbeatHandling {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test if
+   * {@link FSNamesystem#handleHeartbeat}
+   * correctly selects data node targets for block recovery.
+   */
+  @Test
+  public void testHeartbeatBlockRecovery() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final HeartbeatManager hm = namesystem.getBlockManager(
+          ).getDatanodeManager().getHeartbeatManager();
+      final String poolId = namesystem.getBlockPoolId();
+      final DatanodeRegistration nodeReg1 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
+      final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+      final DatanodeRegistration nodeReg2 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
+      final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+      final DatanodeRegistration nodeReg3 = 
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
+      final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+
+      try {
+        namesystem.writeLock();
+        synchronized(hm) {
+          NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
+
+          // Test with all alive nodes.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          dd2.setLastUpdate(System.currentTimeMillis());
+          dd3.setLastUpdate(System.currentTimeMillis());
+          BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          DatanodeCommand[] cmds =
+              NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+
+          // Test with one stale node.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis());
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(2, recoveringNodes.length);
+          // dd2 is skipped.
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd3);
+
+          // Test with all stale node.
+          dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          // Only dd1 is included since it heart beated and hence its not stale
+          // when the list of recovery blocks is constructed.
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1476400&r1=1476399&r2=1476400&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
Fri Apr 26 20:52:29 2013
@@ -521,9 +521,17 @@ public class TestPipelinesFailover {
         storedBlock instanceof BlockInfoUnderConstruction);
     BlockInfoUnderConstruction ucBlock =
       (BlockInfoUnderConstruction)storedBlock;
-    // We expect that the first indexed replica will be the one
-    // to be in charge of the synchronization / recovery protocol.
-    DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
+    // We expect that the replica with the most recent heart beat will be
+    // the one to be in charge of the synchronization / recovery protocol.
+    DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
+    DatanodeDescriptor expectedPrimary = datanodes[0];
+    long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+    for (int i = 1; i < datanodes.length; i++) {
+      if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
+        expectedPrimary = datanodes[i];
+        mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+      }
+    }
     return expectedPrimary;
   }
 



Mime
View raw message