hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1148981 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Wed, 20 Jul 2011 23:35:51 GMT
Author: mattf
Date: Wed Jul 20 23:35:50 2011
New Revision: 1148981

URL: http://svn.apache.org/viewvc?rev=1148981&view=rev
Log:
HDFS-2114. re-commission of a decommissioned node does not delete excess replicas. Contributed
by John George.

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1148981&r1=1148980&r2=1148981&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Wed Jul 20 23:35:50 2011
@@ -861,6 +861,9 @@ Trunk (unreleased changes)
     HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma
     Maheswara Rao G via atm)
 
+    HDFS-2114. re-commission of a decommissioned node does not delete 
+    excess replicas. (John George via mattf)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1148981&r1=1148980&r2=1148981&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Wed Jul 20 23:35:50 2011
@@ -1878,6 +1878,25 @@ public class BlockManager {
   }
   
   /**
+   * On stopping decommission, check if the node has excess replicas.
+   * If there are any excess replicas, call processOverReplicatedBlock()
+   */
+  public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) {
+    final Iterator<? extends Block> it = srcNode.getBlockIterator();
+    while(it.hasNext()) {
+      final Block block = it.next();
+      INodeFile fileINode = blocksMap.getINode(block);
+      short expectedReplication = fileINode.getReplication();
+      NumberReplicas num = countNodes(block);
+      int numCurrentReplica = num.liveReplicas();
+      if (numCurrentReplica > expectedReplication) {
+        // over-replicated block 
+        processOverReplicatedBlock(block, expectedReplication, null, null);
+      }
+    }
+  }
+
+  /**
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1148981&r1=1148980&r2=1148981&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Wed Jul 20 23:35:50 2011
@@ -3870,6 +3870,7 @@ public class FSNamesystem implements FSC
         node.stopDecommission();
         updateStats(node, true);
       }
+      blockManager.processOverReplicatedBlocksOnReCommission(node);
     }
   }
 

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1148981&r1=1148980&r2=1148981&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Wed
Jul 20 23:35:50 2011
@@ -51,6 +51,8 @@ public class TestDecommission {
   static final int blockSize = 8192;
   static final int fileSize = 16384;
   static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
+  static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
+  static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
 
   Random myrand = new Random();
   Path hostsFile;
@@ -74,7 +76,10 @@ public class TestDecommission {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
+  
     writeConfigFile(excludeFile, null);
   }
   
@@ -118,49 +123,67 @@ public class TestDecommission {
     stm.close();
     LOG.info("Created file " + name + " with " + repl + " replicas.");
   }
-  
+
   /**
-   * For blocks that reside on the nodes that are down, verify that their
-   * replication factor is 1 more than the specified one.
+   * Verify that the number of replicas are as expected for each block in
+   * the given file.
+   * For blocks with a decommissioned node, verify that their replication
+   * is 1 more than what is specified.
+   * For blocks without decommissioned nodes, verify their replication is
+   * equal to what is specified.
+   * 
+   * @param downnode - if null, there is no decommissioned node for this file.
+   * @return - null if no failure found, else an error message string.
    */
-  private void checkFile(FileSystem fileSys, Path name, int repl,
-                         String downnode, int numDatanodes) throws IOException {
-    //
-    // sleep an additional 10 seconds for the blockreports from the datanodes
-    // to arrive. 
-    //
+  private String checkFile(FileSystem fileSys, Path name, int repl,
+    String downnode, int numDatanodes) throws IOException {
+    boolean isNodeDown = (downnode != null);
     // need a raw stream
-    assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
-        
-    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
+    assertTrue("Not HDFS:"+fileSys.getUri(), 
+    fileSys instanceof DistributedFileSystem);
+    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
       ((DistributedFileSystem)fileSys).open(name);
     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
-
     for (LocatedBlock blk : dinfo) { // for each block
       int hasdown = 0;
-      int firstDecomNodeIndex = -1;
       DatanodeInfo[] nodes = blk.getLocations();
-      for (int j = 0; j < nodes.length; j++) {     // for each replica
-        if (nodes[j].getName().equals(downnode)) {
+      for (int j = 0; j < nodes.length; j++) { // for each replica
+        if (isNodeDown && nodes[j].getName().equals(downnode)) {
           hasdown++;
-          LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
-              + " is decommissioned.");
-        }
-        if (nodes[j].isDecommissioned()) {
-          if (firstDecomNodeIndex == -1) {
-            firstDecomNodeIndex = j;
+          //Downnode must actually be decommissioned
+          if (!nodes[j].isDecommissioned()) {
+            return "For block " + blk.getBlock() + " replica on " +
+              nodes[j].getName() + " is given as downnode, " +
+              "but is not decommissioned";
+          }
+          //Decommissioned node (if any) should only be last node in list.
+          if (j != nodes.length - 1) {
+            return "For block " + blk.getBlock() + " decommissioned node "
+              + nodes[j].getName() + " was not last node in list: "
+              + (j + 1) + " of " + nodes.length;
+          }
+          LOG.info("Block " + blk.getBlock() + " replica on " +
+            nodes[j].getName() + " is decommissioned.");
+        } else {
+          //Non-downnodes must not be decommissioned
+          if (nodes[j].isDecommissioned()) {
+            return "For block " + blk.getBlock() + " replica on " +
+              nodes[j].getName() + " is unexpectedly decommissioned";
           }
-          continue;
         }
-        assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
       }
+
       LOG.info("Block " + blk.getBlock() + " has " + hasdown
-          + " decommissioned replica.");
-      assertEquals("Number of replicas for block " + blk.getBlock(),
-                   Math.min(numDatanodes, repl+hasdown), nodes.length);  
+        + " decommissioned replica.");
+      if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
+        return "Wrong number of replicas for block " + blk.getBlock() +
+          ": " + nodes.length + ", expected " +
+          Math.min(numDatanodes, repl+hasdown);
+      }
     }
+    return null;
   }
-  
+
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
     fileSys.delete(name, true);
@@ -208,6 +231,15 @@ public class TestDecommission {
     return ret;
   }
 
+  /* stop decommission of the datanode and wait for each to reach the NORMAL state */
+  private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
+    LOG.info("Recommissioning node: " + decommissionedNode.getName());
+    writeConfigFile(excludeFile, null);
+    cluster.getNamesystem().refreshNodes(conf);
+    waitNodeState(decommissionedNode, AdminStates.NORMAL);
+
+  }
+
   /* 
    * Wait till node is fully decommissioned.
    */
@@ -287,6 +319,14 @@ public class TestDecommission {
   }
   
   /**
+   * Tests recommission for non federated cluster
+   */
+  @Test
+  public void testRecommission() throws IOException {
+    testRecommission(1, 6);
+  }
+
+  /**
    * Test decommission for federeated cluster
    */
   @Test
@@ -323,15 +363,68 @@ public class TestDecommission {
         DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
         assertEquals("All datanodes must be alive", numDatanodes, 
             client.datanodeReport(DatanodeReportType.LIVE).length);
-        checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
+        assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
         cleanupFile(fileSys, file1);
       }
     }
-    
-    // Restart the cluster and ensure decommissioned datanodes
+
+    // Restart the cluster and ensure recommissioned datanodes
     // are allowed to register with the namenode
     cluster.shutdown();
     startCluster(numNamenodes, numDatanodes, conf);
+    cluster.shutdown();
+  }
+
+
+  private void testRecommission(int numNamenodes, int numDatanodes) 
+    throws IOException {
+    LOG.info("Starting test testRecommission");
+
+    startCluster(numNamenodes, numDatanodes, conf);
+  
+    ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 
+      new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
+    for(int i = 0; i < numNamenodes; i++) {
+      namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
+    }
+    Path file1 = new Path("testDecommission.dat");
+    int replicas = numDatanodes - 1;
+      
+    for (int i = 0; i < numNamenodes; i++) {
+      ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
+      FileSystem fileSys = cluster.getFileSystem(i);
+      writeFile(fileSys, file1, replicas);
+        
+      // Decommission one node. Verify that node is decommissioned.
+      DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
+          AdminStates.DECOMMISSIONED);
+      decommissionedNodes.add(decomNode);
+        
+      // Ensure decommissioned datanode is not automatically shutdown
+      DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
+      assertEquals("All datanodes must be alive", numDatanodes, 
+          client.datanodeReport(DatanodeReportType.LIVE).length);
+      assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
+
+      // stop decommission and check if the new replicas are removed
+      recomissionNode(decomNode);
+      // wait for the block to be deleted
+      int tries = 0;
+      while (tries++ < 20) {
+        try {
+          Thread.sleep(1000);
+          if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
+            break;
+          }
+        } catch (InterruptedException ie) {
+        }
+      }
+      cleanupFile(fileSys, file1);
+      assertTrue("Checked if node was recommissioned " + tries + " times.",
+         tries < 20);
+      LOG.info("tried: " + tries + " times before recommissioned");
+    }
+    cluster.shutdown();
   }
   
   /**



Mime
View raw message