hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r713888 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/
Date Fri, 14 Nov 2008 00:50:39 GMT
Author: hairong
Date: Thu Nov 13 16:50:39 2008
New Revision: 713888

URL: http://svn.apache.org/viewvc?rev=713888&view=rev
Log:
HADOOP-4643. NameNode should exclude excessive replicas when counting live replicas for a
block. Contributed by Hairong Kuang.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=713888&r1=713887&r2=713888&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Nov 13 16:50:39 2008
@@ -1161,6 +1161,9 @@
 
     HADOOP-4556. Block went missing. (hairong)
 
+    HADOOP-4643. NameNode should exclude excessive replicas when counting
+    live replicas for a block. (hairong)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=713888&r1=713887&r2=713888&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu
Nov 13 16:50:39 2008
@@ -182,7 +182,7 @@
   // eventually remove these extras.
   // Mapping: StorageID -> TreeSet<Block>
   //
-  private Map<String, Collection<Block>> excessReplicateMap = 
+  Map<String, Collection<Block>> excessReplicateMap = 
     new TreeMap<String, Collection<Block>>();
 
   Random r = new Random();
@@ -2377,7 +2377,7 @@
           replIndex--;
           NameNode.stateChangeLog.info("BLOCK* "
               + "Removing block " + block
-              + " from neededReplications as it does not belong to any file.");
+              + " from neededReplications as it has enough replicas.");
           continue;
         }
 
@@ -2451,26 +2451,30 @@
     int live = 0;
     int decommissioned = 0;
     int corrupt = 0;
+    int excess = 0;
     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(block);
-      if ((nodes != null) && (nodes.contains(node)))
+      Collection<Block> excessBlocks = 
+        excessReplicateMap.get(node.getStorageID());
+      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
-      else if(!node.isDecommissionInProgress() && !node.isDecommissioned())
-        live++;
-      else
+      else if (node.isDecommissionInProgress() || node.isDecommissioned())
         decommissioned++;
+      else if (excessBlocks != null && excessBlocks.contains(block)) {
+        excess++;
+      } else {
+        live++;
+      }
       containingNodes.add(node);
       // Check if this replica is corrupt
       // If so, do not select the node as src node
-      if ((nodes != null) && nodes.contains(node))
+      if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
         continue;
       if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
         continue; // already reached replication limit
       // the block must not be scheduled for removal on srcNode
-      Collection<Block> excessBlocks = 
-        excessReplicateMap.get(node.getStorageID());
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;
       // never use already decommissioned nodes
@@ -2490,7 +2494,7 @@
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, corrupt);
+      numReplicas.initialize(live, decommissioned, corrupt, excess);
     return srcNode;
   }
 
@@ -3459,23 +3463,25 @@
    * A immutable object that stores the number of live replicas and
    * the number of decommissined Replicas.
    */
-  private static class NumberReplicas {
+  static class NumberReplicas {
     private int liveReplicas;
     private int decommissionedReplicas;
     private int corruptReplicas;
+    private int excessReplicas;
 
     NumberReplicas() {
-      initialize(0, 0, 0);
+      initialize(0, 0, 0, 0);
     }
 
-    NumberReplicas(int live, int decommissioned, int corrupt) {
-      initialize(live, decommissioned, corrupt);
+    NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
+      initialize(live, decommissioned, corrupt, excess);
     }
 
-    void initialize(int live, int decommissioned, int corrupt) {
+    void initialize(int live, int decommissioned, int corrupt, int excess) {
       liveReplicas = live;
       decommissionedReplicas = decommissioned;
       corruptReplicas = corrupt;
+      excessReplicas = excess;
     }
 
     int liveReplicas() {
@@ -3487,6 +3493,9 @@
     int corruptReplicas() {
       return corruptReplicas;
     }
+    int excessReplicas() {
+      return excessReplicas;
+    }
   } 
 
   /**
@@ -3498,6 +3507,7 @@
     int count = 0;
     int live = 0;
     int corrupt = 0;
+    int excess = 0;
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while ( nodeIter.hasNext() ) {
       DatanodeDescriptor node = nodeIter.next();
@@ -3507,17 +3517,23 @@
       else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       }
-      else {
-        live++;
+      else  {
+        Collection<Block> blocksExcess = 
+          excessReplicateMap.get(node.getStorageID());
+        if (blocksExcess != null && blocksExcess.contains(b)) {
+          excess++;
+        } else {
+          live++;
+        }
       }
     }
-    return new NumberReplicas(live, count, corrupt);
+    return new NumberReplicas(live, count, corrupt, excess);
   }
 
   /**
    * Return the number of nodes that are live and decommissioned.
    */
-  private NumberReplicas countNodes(Block b) {
+  NumberReplicas countNodes(Block b) {
     return countNodes(b, blocksMap.nodeIterator(b));
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=713888&r1=713887&r2=713888&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Nov 13 16:50:39
2008
@@ -51,7 +51,7 @@
  */
 public class MiniDFSCluster {
 
-  private class DataNodeProperties {
+  public class DataNodeProperties {
     DataNode datanode;
     Configuration conf;
     String[] dnArgs;
@@ -602,50 +602,55 @@
   /*
    * Shutdown a particular datanode
    */
-  boolean stopDataNode(int i) {
+  DataNodeProperties stopDataNode(int i) {
     if (i < 0 || i >= dataNodes.size()) {
-      return false;
+      return null;
     }
-    DataNode dn = dataNodes.remove(i).datanode;
+    DataNodeProperties dnprop = dataNodes.remove(i);
+    DataNode dn = dnprop.datanode;
     System.out.println("MiniDFSCluster Stopping DataNode " + 
                        dn.dnRegistration.getName() +
                        " from a total of " + (dataNodes.size() + 1) + 
                        " datanodes.");
     dn.shutdown();
     numDataNodes--;
-    return true;
+    return dnprop;
   }
 
-  /*
-   * Restart a particular datanode
+  /**
+   * Restart a datanode
+   * @param dnprop datanode's property
+   * @return true if restarting is successful
+   * @throws IOException
    */
-  synchronized boolean restartDataNode(int i) throws IOException {
-    if (i < 0 || i >= dataNodes.size()) {
-      return false;
-    }
-    DataNodeProperties dnprop = dataNodes.remove(i);
-    DataNode dn = dnprop.datanode;
+  public synchronized boolean restartDataNode(DataNodeProperties dnprop)
+  throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
-    System.out.println("MiniDFSCluster Restart DataNode " + 
-                       dn.dnRegistration.getName() +
-                       " from a total of " + (dataNodes.size() + 1) + 
-                       " datanodes.");
-    dn.shutdown();
-
-    // recreate new datanode with the same configuration as the one
-    // that was stopped.
     Configuration newconf = new Configuration(conf); // save cloned config
     dataNodes.add(new DataNodeProperties(
                      DataNode.createDataNode(args, conf), 
                      newconf, args));
+    numDataNodes++;
     return true;
+
+  }
+  /*
+   * Restart a particular datanode
+   */
+  synchronized boolean restartDataNode(int i) throws IOException {
+    DataNodeProperties dnprop = stopDataNode(i);
+    if (dnprop == null) {
+      return false;
+    } else {
+      return restartDataNode(dnprop);
+    }
   }
 
   /*
    * Shutdown a datanode by name.
    */
-  synchronized boolean stopDataNode(String name) {
+  public synchronized DataNodeProperties stopDataNode(String name) {
     int i;
     for (i = 0; i < dataNodes.size(); i++) {
       DataNode dn = dataNodes.get(i).datanode;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=713888&r1=713887&r2=713888&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Thu Nov
13 16:50:39 2008
@@ -367,7 +367,7 @@
     // corruptReplicasMap
     corruptReplicaSize = cluster.getNameNode().namesystem.
                           corruptReplicas.numCorruptReplicas(blk);
-    while (corruptReplicaSize != 0) {
+    while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
       try {
         LOG.info("Looping until corrupt replica is invalidated");
         Thread.sleep(1000);

Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=713888&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Thu
Nov 13 16:50:39 2008
@@ -0,0 +1,105 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if live nodes count per node is correct 
+ * so NN makes right decision for under/over-replicated blocks
+ */
+public class TestNodeCount extends TestCase {
+  public void testNodeCount() throws Exception {
+    // start a mini dfs cluster of 2 nodes
+    final Configuration conf = new Configuration();
+    final short REPLICATION_FACTOR = (short)2;
+    final MiniDFSCluster cluster = 
+      new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
+    try {
+      final FSNamesystem namesystem = cluster.getNameNode().namesystem;
+      final FileSystem fs = cluster.getFileSystem();
+      
+      // populate the cluster with a one block file
+      final Path FILE_PATH = new Path("/testfile");
+      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+
+      // keep a copy of all datanode descriptor
+      DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
+         namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
+      
+      // start two new nodes
+      cluster.startDataNodes(conf, 2, true, null, null);
+      cluster.waitActive();
+      
+      // bring down first datanode
+      DatanodeDescriptor datanode = datanodes[0];
+      DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName());
+      // make sure that NN detects that the datanode is down
+      synchronized (namesystem.heartbeats) {
+        datanode.setLastUpdate(0); // mark it dead
+        namesystem.heartbeatCheck();
+      }
+      // the block will be replicated
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+
+      // restart the first datanode
+      cluster.restartDataNode(dnprop);
+      cluster.waitActive();
+      
+      // check if excessive replica is detected
+      NumberReplicas num = null;
+      do {
+       num = namesystem.countNodes(block);
+      } while (num.excessReplicas() == 0);
+      
+      // find out a non-excess node
+      Iterator<DatanodeDescriptor> iter = namesystem.blocksMap.nodeIterator(block);
+      DatanodeDescriptor nonExcessDN = null;
+      while (iter.hasNext()) {
+        DatanodeDescriptor dn = iter.next();
+        Collection<Block> blocks = namesystem.excessReplicateMap.get(dn.getStorageID());
+        if (blocks == null || !blocks.contains(block) ) {
+          nonExcessDN = dn;
+          break;
+        }
+      }
+      assertTrue(nonExcessDN!=null);
+      
+      // bring down non excessive datanode
+      dnprop = cluster.stopDataNode(nonExcessDN.getName());
+      // make sure that NN detects that the datanode is down
+      synchronized (namesystem.heartbeats) {
+        nonExcessDN.setLastUpdate(0); // mark it dead
+        namesystem.heartbeatCheck();
+      }
+      
+      // The block should be replicated
+      do {
+        num = namesystem.countNodes(block);
+      } while (num.liveReplicas() != REPLICATION_FACTOR);
+      
+      // restart the first datanode
+      cluster.restartDataNode(dnprop);
+      cluster.waitActive();
+      
+      // check if excessive replica is detected
+      do {
+       num = namesystem.countNodes(block);
+      } while (num.excessReplicas() == 2);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}



Mime
View raw message