hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r1049290 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Tue, 14 Dec 2010 21:38:59 GMT
Author: hairong
Date: Tue Dec 14 21:38:58 2010
New Revision: 1049290

URL: http://svn.apache.org/viewvc?rev=1049290&view=rev
Log:
HDFS-1476. listCorruptFileBlocks should be functional while the name node is still in safe
mode. Contributed by Patrick Kling.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 14 21:38:58 2010
@@ -26,6 +26,9 @@ Trunk (unreleased changes)
     HDFS-1533. A more elegant FileSystem#listCorruptFileBlocks API
     (HDFS portion) (Patrick Kling via hairong)
 
+    HDFS-1476. listCorruptFileBlocks should be functional while the
+    name node is in safe mode. (Patrick Kling via hairong)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Dec 14 21:38:58
2010
@@ -57,6 +57,11 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_SAFEMODE_EXTENSION_DEFAULT = 30000;
   public static final String  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
   public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
+  // set this to a slightly smaller value than
+  // DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT to populate
+  // needed replication queues before exiting safe mode
+  public static final String  DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY =
+    "dfs.namenode.replqueue.threshold-pct";
   public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
   public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue
Dec 14 21:38:58 2010
@@ -624,7 +624,7 @@ public class BlockManager {
     if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(storedBlock, node);
-    } else {
+    } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
       updateNeededReplications(storedBlock, -1, 0);
     }
@@ -1154,8 +1154,8 @@ public class BlockManager {
       return storedBlock;
     }
 
-    // do not handle mis-replicated blocks during startup
-    if (namesystem.isInSafeMode())
+    // do not handle mis-replicated blocks during start up
+    if (!namesystem.isPopulatingReplQueues())
       return storedBlock;
 
     // handle underReplication/overReplication

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue
Dec 14 21:38:58 2010
@@ -3744,6 +3744,8 @@ public class FSNamesystem implements FSC
     private int extension;
     /** Min replication required by safe mode. */
     private int safeReplication;
+    /** threshold for populating needed replication queues */
+    private double replQueueThreshold;
       
     // internal fields
     /** Time when threshold was reached.
@@ -3758,9 +3760,13 @@ public class FSNamesystem implements FSC
     private int blockSafe;
     /** Number of blocks needed to satisfy safe mode threshold condition */
     private int blockThreshold;
+    /** Number of blocks needed before populating replication queues */
+    private int blockReplQueueThreshold;
     /** time of the last status printout */
     private long lastStatusReport = 0;
-      
+    /** flag indicating whether replication queues have been initialized */
+    private boolean initializedReplQueues = false;
+    
     /**
      * Creates SafeModeInfo when the name node enters
      * automatic safe mode at startup.
@@ -3775,6 +3781,10 @@ public class FSNamesystem implements FSC
       this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
       this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,

                                          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+      // default to safe mode threshold (i.e., don't populate queues before leaving safe
mode)
+      this.replQueueThreshold = 
+        conf.getFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+                      (float) threshold);
       this.blockTotal = 0; 
       this.blockSafe = 0;
     }
@@ -3792,6 +3802,7 @@ public class FSNamesystem implements FSC
       this.datanodeThreshold = Integer.MAX_VALUE;
       this.extension = Integer.MAX_VALUE;
       this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
+      this.replQueueThreshold = 1.5f; // can never be reached
       this.blockTotal = -1;
       this.blockSafe = -1;
       this.reached = -1;
@@ -3815,6 +3826,13 @@ public class FSNamesystem implements FSC
     }
       
     /**
+     * Check if we are populating replication queues.
+     */
+    synchronized boolean isPopulatingReplQueues() {
+      return initializedReplQueues;
+    }
+
+    /**
      * Enter safe mode.
      */
     void enter() {
@@ -3842,8 +3860,10 @@ public class FSNamesystem implements FSC
           return;
         }
       }
-      // verify blocks replications
-      blockManager.processMisReplicatedBlocks();
+      // if not done yet, initialize replication queues
+      if (!isPopulatingReplQueues()) {
+        initializeReplQueues();
+      }
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");
@@ -3860,6 +3880,26 @@ public class FSNamesystem implements FSC
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
                                    +blockManager.neededReplications.size()+" blocks");
     }
+
+    /**
+     * Initialize replication queues.
+     */
+    synchronized void initializeReplQueues() {
+      LOG.info("initializing replication queues");
+      if (isPopulatingReplQueues()) {
+        LOG.warn("Replication queues already initialized.");
+      }
+      blockManager.processMisReplicatedBlocks();
+      initializedReplQueues = true;
+    }
+
+    /**
+     * Check whether we have reached the threshold for 
+     * initializing replication queues.
+     */
+    synchronized boolean canInitializeReplQueues() {
+      return blockSafe >= blockReplQueueThreshold;
+    }
       
     /** 
      * Safe mode can be turned off iff 
@@ -3892,6 +3932,10 @@ public class FSNamesystem implements FSC
     private void checkMode() {
       if (needEnter()) {
         enter();
+        // check if we are ready to initialize replication queues
+        if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+          initializeReplQueues();
+        }
         reportStatus("STATE* Safe mode ON.", false);
         return;
       }
@@ -3910,6 +3954,11 @@ public class FSNamesystem implements FSC
       smmthread = new Daemon(new SafeModeMonitor());
       smmthread.start();
       reportStatus("STATE* Safe mode extension entered.", true);
+
+      // check if we are ready to initialize replication queues
+      if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+        initializeReplQueues();
+      }
     }
       
     /**
@@ -3918,6 +3967,8 @@ public class FSNamesystem implements FSC
     synchronized void setBlockTotal(int total) {
       this.blockTotal = total;
       this.blockThreshold = (int) (blockTotal * threshold);
+      this.blockReplQueueThreshold = 
+        (int) (((double) blockTotal) * replQueueThreshold);
       checkMode();
     }
       
@@ -4104,11 +4155,19 @@ public class FSNamesystem implements FSC
    * Check whether the name node is in safe mode.
    * @return true if safe mode is ON, false otherwise
    */
-  boolean isInSafeMode() {
+  synchronized boolean isInSafeMode() {
     if (safeMode == null)
       return false;
     return safeMode.isOn();
   }
+
+  /**
+   * Check whether replication queues are populated.
+   */
+  synchronized boolean isPopulatingReplQueues() {
+    return (!isInSafeMode() ||
+            safeMode.isPopulatingReplQueues());
+  }
     
   /**
    * Increment number of blocks that reached minimal replication.
@@ -4836,7 +4895,7 @@ public class FSNamesystem implements FSC
 
     readLock();
     try {
-    if (isInSafeMode()) {
+    if (!isPopulatingReplQueues()) {
       throw new IOException("Cannot run listCorruptFileBlocks because " +
                             "replication queues have not been initialized.");
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Dec 14
21:38:58 2010
@@ -84,6 +84,8 @@ public class MiniDFSCluster {
     private String[] racks = null; 
     private String [] hosts = null;
     private long [] simulatedCapacities = null;
+    // wait until namenode has left safe mode?
+    private boolean waitSafeMode = true;
     
     public Builder(Configuration conf) {
       this.conf = conf;
@@ -160,6 +162,14 @@ public class MiniDFSCluster {
       this.simulatedCapacities = val;
       return this;
     }
+
+    /**
+     * Default: true
+     */
+    public Builder waitSafeMode(boolean val) {
+      this.waitSafeMode = val;
+      return this;
+    }
     
     /**
      * Construct the actual MiniDFSCluster
@@ -182,7 +192,8 @@ public class MiniDFSCluster {
                        builder.option,
                        builder.racks,
                        builder.hosts,
-                       builder.simulatedCapacities);
+                       builder.simulatedCapacities,
+                       builder.waitSafeMode);
   }
   
   public class DataNodeProperties {
@@ -206,6 +217,8 @@ public class MiniDFSCluster {
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
+
+  private boolean waitSafeMode = true;
   
   public final static String FINALIZED_DIR_NAME = "/current/finalized/";
   
@@ -377,16 +390,19 @@ public class MiniDFSCluster {
                         long[] simulatedCapacities) throws IOException {
     initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
         manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
-        simulatedCapacities);
+        simulatedCapacities, true);
   }
 
   private void initMiniDFSCluster(int nameNodePort, Configuration conf,
       int numDataNodes, boolean format, boolean manageNameDfsDirs,
       boolean manageDataDfsDirs, StartupOption operation, String[] racks,
-      String[] hosts, long[] simulatedCapacities) throws IOException {
+      String[] hosts, long[] simulatedCapacities, boolean waitSafeMode)
+    throws IOException {
     this.conf = conf;
     base_dir = new File(getBaseDirectory());
     data_dir = new File(base_dir, "data");
+
+    this.waitSafeMode = waitSafeMode;
     
     // use alternate RPC engine if spec'd
     String rpcEngineName = System.getProperty("hdfs.rpc.engine");
@@ -955,7 +971,8 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Returns true if the NameNode is running and is out of Safe Mode.
+   * Returns true if the NameNode is running and is out of Safe Mode
+   * or if waiting for safe mode is disabled.
    */
   public boolean isClusterUp() {
     if (nameNode == null) {
@@ -964,7 +981,7 @@ public class MiniDFSCluster {
     long[] sizes = nameNode.getStats();
     boolean isUp = false;
     synchronized (this) {
-      isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
+      isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);
     }
     return isUp;
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java?rev=1049290&r1=1049289&r2=1049290&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
Tue Dec 14 21:38:58 2010
@@ -38,7 +38,10 @@ import org.apache.hadoop.hdfs.CorruptFil
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * This class tests the listCorruptFileBlocks API.
@@ -121,6 +124,137 @@ public class TestListCorruptFileBlocks {
       if (cluster != null) { cluster.shutdown(); }
     }
   }
+
+  /**
+   * Check that listCorruptFileBlocks works while the namenode is still in safemode.
+   */
+  @Test
+  public void testListCorruptFileBlocksInSafeMode() throws Exception {
+    MiniDFSCluster cluster = null;
+    Random random = new Random();
+
+    try {
+      Configuration conf = new HdfsConfiguration();
+      // datanode scans directories
+      conf.setInt("dfs.datanode.directoryscan.interval", 1);
+      // datanode sends block reports
+      conf.setInt("dfs.blockreport.intervalMsec", 3 * 1000);
+      // never leave safemode automatically
+      conf.setFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
+                    1.5f);
+      // start populating repl queues immediately 
+      conf.setFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+                    0f);
+      cluster = new MiniDFSCluster.Builder(conf).waitSafeMode(false).build();
+      cluster.getNameNode().
+        setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+      FileSystem fs = cluster.getFileSystem();
+
+      // create two files with one block each
+      DFSTestUtil util = new DFSTestUtil("testListCorruptFileBlocksInSafeMode",
+                                         2, 1, 512);
+      util.createFiles(fs, "/srcdat10");
+
+      // fetch bad file list from namenode. There should be none.
+      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = 
+        cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null);
+      assertTrue("Namenode has " + badFiles.size()
+          + " corrupt files. Expecting None.", badFiles.size() == 0);
+
+      // Now deliberately corrupt one block
+      File data_dir = new File(System.getProperty("test.build.data"),
+      "dfs/data/data1/current/finalized");
+      assertTrue("data directory does not exist", data_dir.exists());
+      File[] blocks = data_dir.listFiles();
+      assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
+                 (blocks.length > 0));
+      for (int idx = 0; idx < blocks.length; idx++) {
+        if (blocks[idx].getName().startsWith("blk_") &&
+            blocks[idx].getName().endsWith(".meta")) {
+          //
+          // shorten .meta file
+          //
+          RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
+          FileChannel channel = file.getChannel();
+          long position = channel.size() - 2;
+          int length = 2;
+          byte[] buffer = new byte[length];
+          random.nextBytes(buffer);
+          channel.write(ByteBuffer.wrap(buffer), position);
+          file.close();
+          LOG.info("Deliberately corrupting file " + blocks[idx].getName() +
+              " at offset " + position + " length " + length);
+
+          // read all files to trigger detection of corrupted replica
+          try {
+            util.checkFiles(fs, "/srcdat10");
+          } catch (BlockMissingException e) {
+            System.out.println("Received BlockMissingException as expected.");
+          } catch (IOException e) {
+            assertTrue("Corrupted replicas not handled properly. " +
+                       "Expecting BlockMissingException " +
+                       " but received IOException " + e, false);
+          }
+          break;
+        }
+      }
+
+      // fetch bad file list from namenode. There should be one file.
+      badFiles = cluster.getNameNode().getNamesystem().
+        listCorruptFileBlocks("/", null);
+      LOG.info("Namenode has bad files. " + badFiles.size());
+      assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting 1.",
+          badFiles.size() == 1);
+ 
+      // restart namenode
+      cluster.restartNameNode();
+      fs = cluster.getFileSystem();
+
+      // wait until replication queues have been initialized
+      while (!cluster.getNameNode().namesystem.isPopulatingReplQueues()) {
+        try {
+          LOG.info("waiting for replication queues");
+          Thread.sleep(1000);
+        } catch (InterruptedException ignore) {
+        }
+      }
+
+      // read all files to trigger detection of corrupted replica
+      try {
+        util.checkFiles(fs, "/srcdat10");
+      } catch (BlockMissingException e) {
+        System.out.println("Received BlockMissingException as expected.");
+      } catch (IOException e) {
+        assertTrue("Corrupted replicas not handled properly. " +
+                   "Expecting BlockMissingException " +
+                   " but received IOException " + e, false);
+      }
+
+      // fetch bad file list from namenode. There should be one file.
+      badFiles = cluster.getNameNode().getNamesystem().
+        listCorruptFileBlocks("/", null);
+      LOG.info("Namenode has bad files. " + badFiles.size());
+      assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting 1.",
+          badFiles.size() == 1);
+
+      // check that we are still in safe mode
+      assertTrue("Namenode is not in safe mode", 
+                 cluster.getNameNode().isInSafeMode());
+
+      // now leave safe mode so that we can clean up
+      cluster.getNameNode().
+        setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+      util.cleanup(fs, "/srcdat10");
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown(); 
+      }
+    }
+  }
   
   // deliberately remove blocks from a file and validate the list-corrupt-file-blocks API
   @Test



Mime
View raw message