hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r1101343 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Tue, 10 May 2011 07:06:56 GMT
Author: hairong
Date: Tue May 10 07:06:56 2011
New Revision: 1101343

URL: http://svn.apache.org/viewvc?rev=1101343&view=rev
Log:
HFDS-1826. NameNode should save image to name directories in parallel during upgrade. Contributed
by Matt FoleIy.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1101343&r1=1101342&r2=1101343&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue May 10 07:06:56 2011
@@ -405,6 +405,9 @@ Trunk (unreleased changes)
 
     HDFS-1601. Pipeline ACKs are sent as lots of tiny TCP packets (todd)
 
+    HDFS-1826. NameNode should save image to name directories in parallel
+    during upgrade. (Matt Foley via hairong)
+
   BUG FIXES
 
     HDFS-1449. Fix test failures - ExtendedBlock must return 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1101343&r1=1101342&r2=1101343&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue May
10 07:06:56 2011
@@ -354,29 +354,62 @@ public class FSImage implements NNStorag
     int oldLV = storage.getLayoutVersion();
     storage.layoutVersion = FSConstants.LAYOUT_VERSION;
     storage.setCheckpointTime(now());
+    
+    List<StorageDirectory> errorSDs =
+      Collections.synchronizedList(new ArrayList<StorageDirectory>());
+    List<Thread> saveThreads = new ArrayList<Thread>();
+    File curDir, prevDir, tmpDir;
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
-      LOG.info("Upgrading image directory " + sd.getRoot()
+      LOG.info("Starting upgrade of image directory " + sd.getRoot()
                + ".\n   old LV = " + oldLV
                + "; old CTime = " + oldCTime
                + ".\n   new LV = " + storage.getLayoutVersion()
                + "; new CTime = " + storage.getCTime());
-      File curDir = sd.getCurrentDir();
-      File prevDir = sd.getPreviousDir();
-      File tmpDir = sd.getPreviousTmp();
-      assert curDir.exists() : "Current directory must exist.";
-      assert !prevDir.exists() : "prvious directory must not exist.";
-      assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
-      assert !editLog.isOpen() : "Edits log must not be open.";
-      // rename current to tmp
-      NNStorage.rename(curDir, tmpDir);
-      // save new image
-      saveCurrent(sd);
-      // rename tmp to previous
-      NNStorage.rename(tmpDir, prevDir);
-      isUpgradeFinalized = false;
+      try {
+        curDir = sd.getCurrentDir();
+        prevDir = sd.getPreviousDir();
+        tmpDir = sd.getPreviousTmp();
+        assert curDir.exists() : "Current directory must exist.";
+        assert !prevDir.exists() : "prvious directory must not exist.";
+        assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
+        assert !editLog.isOpen() : "Edits log must not be open.";
+
+        // rename current to tmp
+        NNStorage.rename(curDir, tmpDir);
+        
+        // launch thread to save new image
+        FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+        Thread saveThread = new Thread(saver, saver.toString());
+        saveThreads.add(saveThread);
+        saveThread.start();
+        
+      } catch (Exception e) {
+        LOG.error("Failed upgrade of image directory " + sd.getRoot(), e);
+        errorSDs.add(sd);
+        continue;
+      }
+    }
+    waitForThreads(saveThreads);
+    saveThreads.clear();
+
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if (errorSDs.contains(sd)) continue;
+      try {
+        prevDir = sd.getPreviousDir();
+        tmpDir = sd.getPreviousTmp();
+        // rename tmp to previous
+        NNStorage.rename(tmpDir, prevDir);
+      } catch (IOException ioe) {
+        LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
+        errorSDs.add(sd);
+        continue;
+      }
       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
+    isUpgradeFinalized = false;
+    storage.reportErrorsOnDirectories(errorSDs);
     storage.initializeDistributedUpgrade();
     editLog.open();
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1101343&r1=1101342&r2=1101343&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Tue May 10
07:06:56 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -163,6 +164,8 @@ public class TestDFSUpgrade {
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
       checkNameNode(nameNodeDirs);
+      if (numDirs > 1)
+        TestParallelImageWrite.checkImages(cluster.getNamesystem(), numDirs);
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
@@ -278,6 +281,23 @@ public class TestDFSUpgrade {
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
     } // end numDir loop
+    
+    // One more check: normal NN upgrade with 4 directories, concurrent write
+    int numDirs = 4;
+    {
+      conf = new HdfsConfiguration();
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
+      String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+      
+      log("Normal NameNode upgrade", numDirs);
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      cluster = createCluster();
+      checkNameNode(nameNodeDirs);
+      TestParallelImageWrite.checkImages(cluster.getNamesystem(), numDirs);
+      cluster.shutdown();
+      UpgradeUtilities.createEmptyDirs(nameNodeDirs);
+    }
   }
  
   @Test(expected=IOException.class)

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java?rev=1101343&r1=1101342&r2=1101343&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
Tue May 10 07:06:56 2011
@@ -24,8 +24,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -46,11 +46,13 @@ import java.io.FileInputStream;
  * Specifically with FSImage being written in parallel
  */
 public class TestParallelImageWrite extends TestCase {
+  private static final int NUM_DATANODES = 4;
   /** check if DFS remains in proper condition after a restart */
   public void testRestartDFS() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     FSNamesystem fsn = null;
+    int numNamenodeDirs;
     DFSTestUtil files = new DFSTestUtil("TestRestartDFS", 200, 3, 8*1024);
 
     final String dir = "/srcdat";
@@ -62,7 +64,13 @@ public class TestParallelImageWrite exte
     FileStatus dirstatus;
 
     try {
-      cluster = new MiniDFSCluster.Builder(conf).format(true).numDataNodes(4).build();
+      cluster = new MiniDFSCluster.Builder(conf).format(true)
+          .numDataNodes(NUM_DATANODES).build();
+      String[] nameNodeDirs = conf.getStrings(
+          DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new String[] {});
+      numNamenodeDirs = nameNodeDirs.length;
+      assertTrue("failed to get number of Namenode StorageDirs", 
+          numNamenodeDirs != 0);
       FileSystem fs = cluster.getFileSystem();
       files.createFiles(fs, dir);
 
@@ -77,7 +85,8 @@ public class TestParallelImageWrite exte
     }
     try {
       // Here we restart the MiniDFScluster without formatting namenode
-      cluster = new MiniDFSCluster.Builder(conf).format(false).numDataNodes(4).build();
+      cluster = new MiniDFSCluster.Builder(conf).format(false)
+          .numDataNodes(NUM_DATANODES).build();
       fsn = cluster.getNamesystem();
       FileSystem fs = cluster.getFileSystem();
       assertTrue("Filesystem corrupted after restart.",
@@ -93,14 +102,16 @@ public class TestParallelImageWrite exte
       assertEquals(dirstatus.getGroup() + "_XXX", newdirstatus.getGroup());
       rootmtime = fs.getFileStatus(rootpath).getModificationTime();
 
-      checkImages(fsn);
-
+      final long checkAfterRestart = checkImages(fsn, numNamenodeDirs);
+      
       // Modify the system and then perform saveNamespace
       files.cleanup(fs, dir);
       files.createFiles(fs, dir);
       fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
       cluster.getNameNode().saveNamespace();
-      checkImages(fsn);
+      final long checkAfterModify = checkImages(fsn, numNamenodeDirs);
+      assertTrue("Modified namespace doesn't change fsimage contents",
+          checkAfterRestart != checkAfterModify);
       fsn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
       files.cleanup(fs, dir);
     } finally {
@@ -108,13 +119,29 @@ public class TestParallelImageWrite exte
     }
   }
   
-  private void checkImages(FSNamesystem fsn) throws Exception {
-    Iterator<StorageDirectory> iter = fsn.
-            getFSImage().getStorage().dirIterator(NameNodeDirType.IMAGE);
+  /**
+   * Confirm that FSImage files in all StorageDirectory are the same,
+   * and non-empty, and there are the expected number of them.
+   * @param fsn - the FSNamesystem being checked.
+   * @param numImageDirs - the configured number of StorageDirectory of type IMAGE. 
+   * @return - the checksum of the FSImage files, which must all be the same.
+   * @throws AssertionFailedError if image files are empty or different,
+   *     if less than two StorageDirectory are provided, or if the
+   *     actual number of StorageDirectory is less than configured.
+   */
+  public static long checkImages(FSNamesystem fsn, int numImageDirs) throws Exception {
+    NNStorage stg = fsn.getFSImage().getStorage();
+    //any failed StorageDirectory is removed from the storageDirs list
+    assertEquals("Some StorageDirectories failed Upgrade",
+        numImageDirs, stg.getNumStorageDirs(NameNodeDirType.IMAGE));
+    assertTrue("Not enough fsimage copies in MiniDFSCluster " + 
+        "to test parallel write", numImageDirs > 1);
+    //checksum the FSImage stored in each storageDir
+    Iterator<StorageDirectory> iter = stg.dirIterator(NameNodeDirType.IMAGE);
     List<Long> checksums = new ArrayList<Long>();
     while (iter.hasNext()) {
       StorageDirectory sd = iter.next();
-      File fsImage = fsn.getFSImage().getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
+      File fsImage = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE);
       PureJavaCrc32 crc = new PureJavaCrc32();
       FileInputStream in = new FileInputStream(fsImage);
       byte[] buff = new byte[4096];
@@ -125,11 +152,14 @@ public class TestParallelImageWrite exte
       long val = crc.getValue();
       checksums.add(val);
     }
-    assertTrue("Not enough fsimage copies in MiniDFSCluster " + 
-               "to test parallel write", checksums.size() > 1);
-    for (int i = 1; i < checksums.size(); i++) {
+    assertEquals(numImageDirs, checksums.size());
+    PureJavaCrc32 crc = new PureJavaCrc32();
+    long emptyCrc = crc.getValue();
+    assertTrue("Empty fsimage file", checksums.get(0) != emptyCrc);
+    for (int i = 1; i < numImageDirs; i++) {
       assertEquals(checksums.get(i - 1), checksums.get(i));
     }
+    return checksums.get(0);
   }
 }
 



Mime
View raw message