hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1152295 [3/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Date Fri, 29 Jul 2011 16:28:51 GMT
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 29 16:28:45 2011
@@ -21,17 +21,13 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,42 +42,45 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
  * 
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSImage implements NNStorageListener, Closeable {
+public class FSImage implements Closeable {
   protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
-  private static final SimpleDateFormat DATE_FORM =
-      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  // checkpoint states
-  enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
-
   protected FSNamesystem namesystem = null;
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
-  protected MD5Hash newImageDigest = null;
 
-  protected NNStorage storage = null;
+  protected NNStorage storage;
+  
+  /**
+   * The last transaction ID that was either loaded from an image
+   * or loaded by loading edits files.
+   */
+  protected long lastAppliedTxId = 0;
 
   /**
    * URIs for importing an image from a checkpoint. In the default case,
@@ -90,75 +89,69 @@ public class FSImage implements NNStorag
   private Collection<URI> checkpointDirs;
   private Collection<URI> checkpointEditsDirs;
 
-  private Configuration conf;
+  final private Configuration conf;
+
+  private final NNStorageRetentionManager archivalManager; 
 
   /**
-   * Can fs-image be rolled?
+   * Construct an FSImage.
+   * @param conf Configuration
+   * @see #FSImage(Configuration conf, FSNamesystem ns, 
+   *               Collection imageDirs, Collection editsDirs) 
+   * @throws IOException if default directories are invalid.
    */
-  volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; 
+  public FSImage(Configuration conf) throws IOException {
+    this(conf, (FSNamesystem)null);
+  }
 
   /**
+   * Construct an FSImage
+   * @param conf Configuration
+   * @param ns The FSNamesystem using this image.
+   * @see #FSImage(Configuration conf, FSNamesystem ns, 
+   *               Collection imageDirs, Collection editsDirs) 
+   * @throws IOException if default directories are invalid.
    */
-  FSImage() {
-    this((FSNamesystem)null);
+  private FSImage(Configuration conf, FSNamesystem ns) throws IOException {
+    this(conf, ns,
+         FSNamesystem.getNamespaceDirs(conf),
+         FSNamesystem.getNamespaceEditsDirs(conf));
   }
 
   /**
-   * Constructor
+   * Construct the FSImage. Set the default checkpoint directories.
+   *
+   * Setup storage and initialize the edit log.
+   *
    * @param conf Configuration
+   * @param ns The FSNamesystem using this image.
+   * @param imageDirs Directories the image can be stored in.
+   * @param editsDirs Directories the editlog can be stored in.
+   * @throws IOException if directories are invalid.
    */
-  FSImage(Configuration conf) throws IOException {
-    this();
-    this.conf = conf; // TODO we have too many constructors, this is a mess
-
-    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
-      NameNode.LOG.info("set FSImage.restoreFailedStorage");
-      storage.setRestoreFailedStorage(true);
-    }
+  protected FSImage(Configuration conf, FSNamesystem ns,
+                    Collection<URI> imageDirs, Collection<URI> editsDirs)
+      throws IOException {
+    this.conf = conf;
     setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
-        FSImage.getCheckpointEditsDirs(conf, null));
-  }
+                             FSImage.getCheckpointEditsDirs(conf, null));
 
-  private FSImage(FSNamesystem ns) {
-    this.conf = new Configuration();
-    
-    storage = new NNStorage(conf);
+    storage = new NNStorage(conf, imageDirs, editsDirs);
     if (ns != null) {
       storage.setUpgradeManager(ns.upgradeManager);
     }
-    storage.registerListener(this);
+
+    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
+                       DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
+      storage.setRestoreFailedStorage(true);
+    }
 
     this.editLog = new FSEditLog(storage);
     setFSNamesystem(ns);
+    
+    archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
   }
 
-  /**
-   * @throws IOException 
-   */
-  FSImage(Collection<URI> fsDirs, Collection<URI> fsEditsDirs) 
-      throws IOException {
-    this();
-    storage.setStorageDirectories(fsDirs, fsEditsDirs);
-  }
-
-  public FSImage(StorageInfo storageInfo, String bpid) {
-    storage = new NNStorage(storageInfo, bpid);
-  }
-
-  /**
-   * Represents an Image (image and edit file).
-   * @throws IOException 
-   */
-  FSImage(URI imageDir) throws IOException {
-    this();
-    ArrayList<URI> dirs = new ArrayList<URI>(1);
-    ArrayList<URI> editsDirs = new ArrayList<URI>(1);
-    dirs.add(imageDir);
-    editsDirs.add(imageDir);
-    storage.setStorageDirectories(dirs, editsDirs);
-  }
-  
   protected FSNamesystem getFSNamesystem() {
     return namesystem;
   }
@@ -169,33 +162,37 @@ public class FSImage implements NNStorag
       storage.setUpgradeManager(ns.upgradeManager);
     }
   }
-
+ 
   void setCheckpointDirectories(Collection<URI> dirs,
                                 Collection<URI> editsDirs) {
     checkpointDirs = dirs;
     checkpointEditsDirs = editsDirs;
   }
   
+  void format(String clusterId) throws IOException {
+    storage.format(clusterId);
+    saveFSImageInAllDirs(0);    
+  }
+  
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
    * Perform fs state transition if necessary depending on the namespace info.
    * Read storage info. 
    * 
-   * @param dataDirs
-   * @param startOpt startup option
    * @throws IOException
    * @return true if the image needs to be saved or false otherwise
    */
-  boolean recoverTransitionRead(Collection<URI> dataDirs,
-                                Collection<URI> editsDirs,
-                                StartupOption startOpt)
+  boolean recoverTransitionRead(StartupOption startOpt)
       throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
     
+    Collection<URI> imageDirs = storage.getImageDirectories();
+    Collection<URI> editsDirs = storage.getEditsDirectories();
+
     // none of the data dirs exist
-    if((dataDirs.size() == 0 || editsDirs.size() == 0) 
+    if((imageDirs.size() == 0 || editsDirs.size() == 0) 
                              && startOpt != StartupOption.IMPORT)  
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
@@ -210,50 +207,24 @@ public class FSImage implements NNStorag
       throw new IOException("Cannot import image from a checkpoint. "
                             + "\"dfs.namenode.checkpoint.dir\" is not set." );
     
-    storage.setStorageDirectories(dataDirs, editsDirs);
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     Map<StorageDirectory, StorageState> dataDirStates = 
              new HashMap<StorageDirectory, StorageState>();
-    boolean isFormatted = false;
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      StorageState curState;
-      try {
-        curState = sd.analyzeStorage(startOpt, storage);
-        // sd is locked but not opened
-        switch(curState) {
-        case NON_EXISTENT:
-          // name-node fails if any of the configured storage dirs are missing
-          throw new InconsistentFSStateException(sd.getRoot(),
-                      "storage directory does not exist or is not accessible.");
-        case NOT_FORMATTED:
-          break;
-        case NORMAL:
-          break;
-        default:  // recovery is possible
-          sd.doRecover(curState);      
-        }
-        if (curState != StorageState.NOT_FORMATTED 
-            && startOpt != StartupOption.ROLLBACK) {
-          // read and verify consistency with other directories
-          storage.readProperties(sd);
-          isFormatted = true;
-        }
-        if (startOpt == StartupOption.IMPORT && isFormatted)
-          // import of a checkpoint is allowed only into empty image directories
-          throw new IOException("Cannot import image from a checkpoint. " 
-              + " NameNode already contains an image in "+ sd.getRoot());
-      } catch (IOException ioe) {
-        sd.unlock();
-        throw ioe;
-      }
-      dataDirStates.put(sd,curState);
+    boolean isFormatted = recoverStorageDirs(startOpt, dataDirStates);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Data dir states:\n  " +
+        Joiner.on("\n  ").withKeyValueSeparator(": ")
+        .join(dataDirStates));
     }
     
     if (!isFormatted && startOpt != StartupOption.ROLLBACK 
-                     && startOpt != StartupOption.IMPORT)
-      throw new IOException("NameNode is not formatted.");
+                     && startOpt != StartupOption.IMPORT) {
+      throw new IOException("NameNode is not formatted.");      
+    }
+
+
     int layoutVersion = storage.getLayoutVersion();
     if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
       NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
@@ -274,7 +245,6 @@ public class FSImage implements NNStorag
     storage.verifyDistributedUpgradeProgress(startOpt);
 
     // 2. Format unformatted dirs.
-    storage.setCheckpointTime(0L);
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState = dataDirStates.get(sd);
@@ -307,13 +277,55 @@ public class FSImage implements NNStorag
       // just load the image
     }
     
-    boolean needToSave = loadFSImage();
-
-    assert editLog != null : "editLog must be initialized";
-    if(!editLog.isOpen())
-      editLog.open();
-    
-    return needToSave;
+    return loadFSImage();
+  }
+  
+  /**
+   * For each storage directory, performs recovery of incomplete transitions
+   * (eg. upgrade, rollback, checkpoint) and inserts the directory's storage
+   * state into the dataDirStates map.
+   * @param dataDirStates output of storage directory states
+   * @return true if there is at least one valid formatted storage directory
+   */
+  private boolean recoverStorageDirs(StartupOption startOpt,
+      Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
+    boolean isFormatted = false;
+    for (Iterator<StorageDirectory> it = 
+                      storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      StorageState curState;
+      try {
+        curState = sd.analyzeStorage(startOpt, storage);
+        // sd is locked but not opened
+        switch(curState) {
+        case NON_EXISTENT:
+          // name-node fails if any of the configured storage dirs are missing
+          throw new InconsistentFSStateException(sd.getRoot(),
+                      "storage directory does not exist or is not accessible.");
+        case NOT_FORMATTED:
+          break;
+        case NORMAL:
+          break;
+        default:  // recovery is possible
+          sd.doRecover(curState);      
+        }
+        if (curState != StorageState.NOT_FORMATTED 
+            && startOpt != StartupOption.ROLLBACK) {
+          // read and verify consistency with other directories
+          storage.readProperties(sd);
+          isFormatted = true;
+        }
+        if (startOpt == StartupOption.IMPORT && isFormatted)
+          // import of a checkpoint is allowed only into empty image directories
+          throw new IOException("Cannot import image from a checkpoint. " 
+              + " NameNode already contains an image in " + sd.getRoot());
+      } catch (IOException ioe) {
+        sd.unlock();
+        throw ioe;
+      }
+      dataDirStates.put(sd,curState);
+    }
+    return isFormatted;
   }
 
   private void doUpgrade() throws IOException {
@@ -342,12 +354,9 @@ public class FSImage implements NNStorag
     storage.cTime = now();  // generate new cTime for the state
     int oldLV = storage.getLayoutVersion();
     storage.layoutVersion = FSConstants.LAYOUT_VERSION;
-    storage.setCheckpointTime(now());
     
     List<StorageDirectory> errorSDs =
       Collections.synchronizedList(new ArrayList<StorageDirectory>());
-    List<Thread> saveThreads = new ArrayList<Thread>();
-    File curDir, prevDir, tmpDir;
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       LOG.info("Starting upgrade of image directory " + sd.getRoot()
@@ -356,9 +365,9 @@ public class FSImage implements NNStorag
                + ".\n   new LV = " + storage.getLayoutVersion()
                + "; new CTime = " + storage.getCTime());
       try {
-        curDir = sd.getCurrentDir();
-        prevDir = sd.getPreviousDir();
-        tmpDir = sd.getPreviousTmp();
+        File curDir = sd.getCurrentDir();
+        File prevDir = sd.getPreviousDir();
+        File tmpDir = sd.getPreviousTmp();
         assert curDir.exists() : "Current directory must exist.";
         assert !prevDir.exists() : "prvious directory must not exist.";
         assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
@@ -367,27 +376,30 @@ public class FSImage implements NNStorag
         // rename current to tmp
         NNStorage.rename(curDir, tmpDir);
         
-        // launch thread to save new image
-        FSImageSaver saver = new FSImageSaver(sd, errorSDs);
-        Thread saveThread = new Thread(saver, saver.toString());
-        saveThreads.add(saveThread);
-        saveThread.start();
-        
+        if (!curDir.mkdir()) {
+          throw new IOException("Cannot create directory " + curDir);
+        }
       } catch (Exception e) {
-        LOG.error("Failed upgrade of image directory " + sd.getRoot(), e);
+        LOG.error("Failed to move aside pre-upgrade storage " +
+            "in image directory " + sd.getRoot(), e);
         errorSDs.add(sd);
         continue;
       }
     }
-    waitForThreads(saveThreads);
-    saveThreads.clear();
+    storage.reportErrorsOnDirectories(errorSDs);
+    errorSDs.clear();
+
+    saveFSImageInAllDirs(editLog.getLastWrittenTxId());
 
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
-      if (errorSDs.contains(sd)) continue;
       try {
-        prevDir = sd.getPreviousDir();
-        tmpDir = sd.getPreviousTmp();
+        // Write the version file, since saveFsImage above only makes the
+        // fsimage_<txid>, and the directory is otherwise empty.
+        storage.writeProperties(sd);
+        
+        File prevDir = sd.getPreviousDir();
+        File tmpDir = sd.getPreviousTmp();
         // rename tmp to previous
         NNStorage.rename(tmpDir, prevDir);
       } catch (IOException ioe) {
@@ -397,15 +409,16 @@ public class FSImage implements NNStorag
       }
       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
+    storage.reportErrorsOnDirectories(errorSDs);
+
     isUpgradeFinalized = false;
-    if (!errorSDs.isEmpty()) {
-      storage.reportErrorsOnDirectories(errorSDs);
+    if (!storage.getRemovedStorageDirs().isEmpty()) {
       //during upgrade, it's a fatal error to fail any storage directory
-      throw new IOException("Upgrade failed in " + errorSDs.size()
+      throw new IOException("Upgrade failed in "
+          + storage.getRemovedStorageDirs().size()
           + " storage directory(ies), previously logged.");
     }
     storage.initializeDistributedUpgrade();
-    editLog.open();
   }
 
   private void doRollback() throws IOException {
@@ -413,7 +426,7 @@ public class FSImage implements NNStorag
     // a previous fs states in at least one of the storage directories.
     // Directories that don't have previous state do not rollback
     boolean canRollback = false;
-    FSImage prevState = new FSImage(getFSNamesystem());
+    FSImage prevState = new FSImage(conf, getFSNamesystem());
     prevState.getStorage().layoutVersion = FSConstants.LAYOUT_VERSION;
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -499,25 +512,27 @@ public class FSImage implements NNStorag
    */
   void doImportCheckpoint() throws IOException {
     FSNamesystem fsNamesys = getFSNamesystem();
-    FSImage ckptImage = new FSImage(fsNamesys);
+    FSImage ckptImage = new FSImage(conf, fsNamesys,
+                                    checkpointDirs, checkpointEditsDirs);
     // replace real image with the checkpoint image
     FSImage realImage = fsNamesys.getFSImage();
     assert realImage == this;
     fsNamesys.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
-      ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
-                                              StartupOption.REGULAR);
+      ckptImage.recoverTransitionRead(StartupOption.REGULAR);
     } finally {
       ckptImage.close();
     }
     // return back the real image
     realImage.getStorage().setStorageInfo(ckptImage.getStorage());
-    storage.setCheckpointTime(ckptImage.getStorage().getCheckpointTime());
+    realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
+
     fsNamesys.dir.fsImage = realImage;
     realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
     // and save it but keep the same checkpointTime
-    saveNamespace(false);
+    saveNamespace();
+    getStorage().writeAll();
   }
 
   void finalizeUpgrade() throws IOException {
@@ -535,51 +550,23 @@ public class FSImage implements NNStorag
     return editLog;
   }
 
-  //
-  // Atomic move sequence, to recover from interrupted checkpoint
-  //
-  boolean recoverInterruptedCheckpoint(StorageDirectory nameSD,
-                                       StorageDirectory editsSD) 
-                                       throws IOException {
-    boolean needToSave = false;
-    File curFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE);
-    File ckptFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE_NEW);
+  void openEditLog() throws IOException {
+    assert editLog != null : "editLog must be initialized";
+    Preconditions.checkState(!editLog.isOpen(),
+        "edit log should not yet be open");
+    editLog.open();
+    storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
+  };
+  
+  /**
+   * Toss the current image and namesystem, reloading from the specified
+   * file.
+   */
+  void reloadFromImageFile(File file) throws IOException {
+    namesystem.dir.reset();
 
-    //
-    // If we were in the midst of a checkpoint
-    //
-    if (ckptFile.exists()) {
-      needToSave = true;
-      if (NNStorage.getStorageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
-        //
-        // checkpointing migth have uploaded a new
-        // merged image, but we discard it here because we are
-        // not sure whether the entire merged image was uploaded
-        // before the namenode crashed.
-        //
-        if (!ckptFile.delete()) {
-          throw new IOException("Unable to delete " + ckptFile);
-        }
-      } else {
-        //
-        // checkpointing was in progress when the namenode
-        // shutdown. The fsimage.ckpt was created and the edits.new
-        // file was moved to edits. We complete that checkpoint by
-        // moving fsimage.new to fsimage. There is no need to 
-        // update the fstime file here. renameTo fails on Windows
-        // if the destination file already exists.
-        //
-        if (!ckptFile.renameTo(curFile)) {
-          if (!curFile.delete())
-            LOG.warn("Unable to delete dir " + curFile + " before rename");
-          if (!ckptFile.renameTo(curFile)) {
-            throw new IOException("Unable to rename " + ckptFile +
-                                  " to " + curFile);
-          }
-        }
-      }
-    }
-    return needToSave;
+    LOG.debug("Reloading namespace from " + file);
+    loadFSImage(file);
   }
 
   /**
@@ -598,124 +585,131 @@ public class FSImage implements NNStorag
    * @throws IOException
    */
   boolean loadFSImage() throws IOException {
-    long latestNameCheckpointTime = Long.MIN_VALUE;
-    long latestEditsCheckpointTime = Long.MIN_VALUE;
-    boolean needToSave = false;
-    isUpgradeFinalized = true;
+    FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
-    StorageDirectory latestNameSD = null;
-    StorageDirectory latestEditsSD = null;
+    isUpgradeFinalized = inspector.isUpgradeFinalized();
     
-    Collection<String> imageDirs = new ArrayList<String>();
-    Collection<String> editsDirs = new ArrayList<String>();
+    boolean needToSave = inspector.needToSave();
     
-    // Set to determine if all of storageDirectories share the same checkpoint
-    Set<Long> checkpointTimes = new HashSet<Long>();
-
-    // Process each of the storage directories to find the pair of
-    // newest image file and edit file
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-
-      // Was the file just formatted?
-      if (!sd.getVersionFile().exists()) {
-        needToSave |= true;
-        continue;
-      }
-      
-      boolean imageExists = false;
-      boolean editsExists = false;
-      
-      // Determine if sd is image, edits or both
-      if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        imageExists = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE).exists();
-        imageDirs.add(sd.getRoot().getCanonicalPath());
-      }
-      
-      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        editsExists = NNStorage.getStorageFile(sd, NameNodeFile.EDITS).exists();
-        editsDirs.add(sd.getRoot().getCanonicalPath());
-      }
-      
-      long checkpointTime = storage.readCheckpointTime(sd);
+    // Plan our load. This will throw if it's impossible to load from the
+    // data that's available.
+    LoadPlan loadPlan = inspector.createLoadPlan();    
+    LOG.debug("Planning to load image using following plan:\n" + loadPlan);
 
-      checkpointTimes.add(checkpointTime);
-      
-      if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && 
-         (latestNameCheckpointTime < checkpointTime) && imageExists) {
-        latestNameCheckpointTime = checkpointTime;
-        latestNameSD = sd;
-      }
-      
-      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && 
-           (latestEditsCheckpointTime < checkpointTime) && editsExists) {
-        latestEditsCheckpointTime = checkpointTime;
-        latestEditsSD = sd;
-      }
-      
-      // check that we have a valid, non-default checkpointTime
-      if (checkpointTime <= 0L)
-        needToSave |= true;
-      
-      // set finalized flag
-      isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
-    }
-
-    // We should have at least one image and one edits dirs
-    if (latestNameSD == null)
-      throw new IOException("Image file is not found in " + imageDirs);
-    if (latestEditsSD == null)
-      throw new IOException("Edits file is not found in " + editsDirs);
-
-    // Make sure we are loading image and edits from same checkpoint
-    if (latestNameCheckpointTime > latestEditsCheckpointTime
-        && latestNameSD != latestEditsSD
-        && latestNameSD.getStorageDirType() == NameNodeDirType.IMAGE
-        && latestEditsSD.getStorageDirType() == NameNodeDirType.EDITS) {
-      // This is a rare failure when NN has image-only and edits-only
-      // storage directories, and fails right after saving images,
-      // in some of the storage directories, but before purging edits.
-      // See -NOTE- in saveNamespace().
-      LOG.error("This is a rare failure scenario!!!");
-      LOG.error("Image checkpoint time " + latestNameCheckpointTime +
-                " > edits checkpoint time " + latestEditsCheckpointTime);
-      LOG.error("Name-node will treat the image as the latest state of " +
-                "the namespace. Old edits will be discarded.");
-    } else if (latestNameCheckpointTime != latestEditsCheckpointTime)
-      throw new IOException("Inconsistent storage detected, " +
-                      "image and edits checkpoint times do not match. " +
-                      "image checkpoint time = " + latestNameCheckpointTime +
-                      "edits checkpoint time = " + latestEditsCheckpointTime);
-    
-    // If there was more than one checkpointTime recorded we should save
-    needToSave |= checkpointTimes.size() != 1;
     
     // Recover from previous interrupted checkpoint, if any
-    needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
+    needToSave |= loadPlan.doRecovery();
 
     //
     // Load in bits
     //
-    storage.readProperties(latestNameSD);
-    needToSave |= loadFSImage(NNStorage.getStorageFile(latestNameSD,
-                                                       NameNodeFile.IMAGE));
+    StorageDirectory sdForProperties =
+      loadPlan.getStorageDirectoryForProperties();
+    storage.readProperties(sdForProperties);
+    File imageFile = loadPlan.getImageFile();
+
+    try {
+      if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
+                                 getLayoutVersion())) {
+        // For txid-based layout, we should have a .md5 file
+        // next to the image file
+        loadFSImage(imageFile);
+      } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
+                                        getLayoutVersion())) {
+        // In 0.22, we have the checksum stored in the VERSION file.
+        String md5 = storage.getDeprecatedProperty(
+            NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
+        if (md5 == null) {
+          throw new InconsistentFSStateException(sdForProperties.getRoot(),
+              "Message digest property " +
+              NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
+              " not set for storage directory " + sdForProperties.getRoot());
+        }
+        loadFSImage(imageFile, new MD5Hash(md5));
+      } else {
+        // We don't have any record of the md5sum
+        loadFSImage(imageFile, null);
+      }
+    } catch (IOException ioe) {
+      throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
+    }
     
-    // Load latest edits
-    if (latestNameCheckpointTime > latestEditsCheckpointTime)
-      // the image is already current, discard edits
-      needToSave |= true;
-    else // latestNameCheckpointTime == latestEditsCheckpointTime
-      needToSave |= (loadFSEdits(latestEditsSD) > 0);
+    long numLoaded = loadEdits(loadPlan.getEditsFiles());
+    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
     
+    // update the txid for the edit log
+    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
     return needToSave;
   }
 
+
+  /**
+   * @param imageFile the image file that was loaded
+   * @param numEditsLoaded the number of edits loaded from edits logs
+   * @return true if the NameNode should automatically save the namespace
+   * when it is started, due to the latest checkpoint being too old.
+   */
+  private boolean needsResaveBasedOnStaleCheckpoint(
+      File imageFile, long numEditsLoaded) {
+    final long checkpointPeriod = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
+    final long checkpointTxnCount = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+    long checkpointAge = System.currentTimeMillis() - imageFile.lastModified();
+
+    return (checkpointAge > checkpointPeriod * 1000) ||
+           (numEditsLoaded > checkpointTxnCount);
+  }
+  
+  /**
+   * Load the specified list of edit files into the image.
+   * @return the number of transactions loaded
+   */
+  protected long loadEdits(List<File> editLogs) throws IOException {
+    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
+
+    long startingTxId = getLastAppliedTxId() + 1;
+    
+    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+    int numLoaded = 0;
+    // Load latest edits
+    for (File edits : editLogs) {
+      LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
+      EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
+      int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+      startingTxId += thisNumLoaded;
+      numLoaded += thisNumLoaded;
+      lastAppliedTxId += thisNumLoaded;
+      editIn.close();
+    }
+
+    // update the counts
+    getFSNamesystem().dir.updateCountForINodeWithQuota();    
+    return numLoaded;
+  }
+
+
+  /**
+   * Load the image namespace from the given image file, verifying
+   * it against the MD5 sum stored in its associated .md5 file.
+   */
+  private void loadFSImage(File imageFile) throws IOException {
+    MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
+    if (expectedMD5 == null) {
+      throw new IOException("No MD5 file found corresponding to image file "
+          + imageFile);
+    }
+    loadFSImage(imageFile, expectedMD5);
+  }
+  
   /**
    * Load in the filesystem image from file. It's a big list of
    * filenames and blocks.  Return whether we should
    * "re-save" and consolidate the edit-logs
    */
-  boolean loadFSImage(File curFile) throws IOException {
+  private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, getFSNamesystem());
     loader.load(curFile);
@@ -724,63 +718,32 @@ public class FSImage implements NNStorag
     // Check that the image digest we loaded matches up with what
     // we expected
     MD5Hash readImageMd5 = loader.getLoadedImageMd5();
-    if (storage.getImageDigest() == null) {
-      storage.setImageDigest(readImageMd5); // set this fsimage's checksum
-    } else if (!storage.getImageDigest().equals(readImageMd5)) {
+    if (expectedMd5 != null &&
+        !expectedMd5.equals(readImageMd5)) {
       throw new IOException("Image file " + curFile +
           " is corrupt with MD5 checksum of " + readImageMd5 +
-          " but expecting " + storage.getImageDigest());
+          " but expecting " + expectedMd5);
     }
 
-    storage.namespaceID = loader.getLoadedNamespaceID();
-    storage.layoutVersion = loader.getLoadedImageVersion();
-
-    boolean needToSave =
-      loader.getLoadedImageVersion() != FSConstants.LAYOUT_VERSION;
-    return needToSave;
-  }
-
-  /**
-   * Load and merge edits from two edits files
-   * 
-   * @param sd storage directory
-   * @return number of edits loaded
-   * @throws IOException
-   */
-  int loadFSEdits(StorageDirectory sd) throws IOException {
-    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
-    
-    int numEdits = 0;
-    EditLogFileInputStream edits =
-      new EditLogFileInputStream(NNStorage.getStorageFile(sd,
-                                                          NameNodeFile.EDITS));
-    
-    numEdits = loader.loadFSEdits(edits);
-    edits.close();
-    File editsNew = NNStorage.getStorageFile(sd, NameNodeFile.EDITS_NEW);
-    
-    if (editsNew.exists() && editsNew.length() > 0) {
-      edits = new EditLogFileInputStream(editsNew);
-      numEdits += loader.loadFSEdits(edits);
-      edits.close();
-    }
-    if (numEdits == 0 && editsNew.exists()) {
-      numEdits++;
-    }
-    // update the counts.
-    getFSNamesystem().dir.updateCountForINodeWithQuota();    
-    
-    return numEdits;
+    long txId = loader.getLoadedImageTxId();
+    LOG.info("Loaded image for txid " + txId + " from " + curFile);
+    lastAppliedTxId = txId;
+    storage.setMostRecentCheckpointTxId(txId);
   }
 
   /**
    * Save the contents of the FS image to the file.
    */
-  void saveFSImage(File newFile) throws IOException {
+  void saveFSImage(StorageDirectory sd, long txid) throws IOException {
+    File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
+    File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
+    
     FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    saver.save(newFile, getFSNamesystem(), compression);
-    storage.setImageDigest(saver.getSavedDigest());
+    saver.save(newFile, txid, getFSNamesystem(), compression);
+    
+    MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
+    storage.setMostRecentCheckpointTxId(txid);
   }
 
   /**
@@ -797,15 +760,17 @@ public class FSImage implements NNStorag
   private class FSImageSaver implements Runnable {
     private StorageDirectory sd;
     private List<StorageDirectory> errorSDs;
+    private final long txid;
     
-    FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs) {
+    FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs, long txid) {
       this.sd = sd;
       this.errorSDs = errorSDs;
+      this.txid = txid;
     }
     
     public void run() {
       try {
-        saveCurrent(sd);
+        saveFSImage(sd, txid);
       } catch (Throwable t) {
         LOG.error("Unable to save image for " + sd.getRoot(), t);
         errorSDs.add(sd);
@@ -831,296 +796,126 @@ public class FSImage implements NNStorag
     }
   }
   /**
-   * Save the contents of the FS image and create empty edits.
-   * 
-   * In order to minimize the recovery effort in case of failure during
-   * saveNamespace the algorithm reduces discrepancy between directory states
-   * by performing updates in the following order:
-   * <ol>
-   * <li> rename current to lastcheckpoint.tmp for all of them,</li>
-   * <li> save image and recreate edits for all of them,</li>
-   * <li> rename lastcheckpoint.tmp to previous.checkpoint.</li>
-   * </ol>
-   * On stage (2) we first save all images, then recreate edits.
-   * Otherwise the name-node may purge all edits and fail,
-   * in which case the journal will be lost.
+   * Save the contents of the FS image to a new image file in each of the
+   * current storage directories.
    */
-  void saveNamespace(boolean renewCheckpointTime) throws IOException {
- 
-    // try to restore all failed edit logs here
+  void saveNamespace() throws IOException {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
-    editLog.close();
-    if(renewCheckpointTime)
-      storage.setCheckpointTime(now());
-    List<StorageDirectory> errorSDs =
-      Collections.synchronizedList(new ArrayList<StorageDirectory>());
-
-    // mv current -> lastcheckpoint.tmp
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      try {
-        storage.moveCurrent(sd);
-      } catch(IOException ie) {
-        LOG.error("Unable to move current for " + sd.getRoot(), ie);
-        errorSDs.add(sd);
+    boolean editLogWasOpen = editLog.isOpen();
+    
+    if (editLogWasOpen) {
+      editLog.endCurrentLogSegment(true);
+    }
+    long imageTxId = editLog.getLastWrittenTxId();
+    try {
+      saveFSImageInAllDirs(imageTxId);
+      storage.writeAll();
+    } finally {
+      if (editLogWasOpen) {
+        editLog.startLogSegment(imageTxId + 1, true);
+        // Take this opportunity to note the current transaction
+        storage.writeTransactionIdFileToStorage(imageTxId + 1);
       }
     }
+    
+  }
+  
+  protected void saveFSImageInAllDirs(long txid) throws IOException {
+    if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
+      throw new IOException("No image directories available!");
+    }
+    
+    List<StorageDirectory> errorSDs =
+      Collections.synchronizedList(new ArrayList<StorageDirectory>());
 
     List<Thread> saveThreads = new ArrayList<Thread>();
     // save images into current
     for (Iterator<StorageDirectory> it
            = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      if (errorSDs.contains(sd)) {
-        continue;
-      }
-      try {
-        FSImageSaver saver = new FSImageSaver(sd, errorSDs);
-        Thread saveThread = new Thread(saver, saver.toString());
-        saveThreads.add(saveThread);
-        saveThread.start();
-      } catch (Exception e) {
-        LOG.error("Failed save to image directory " + sd.getRoot(), e);
-        errorSDs.add(sd);
-        continue;
-      }
+      FSImageSaver saver = new FSImageSaver(sd, errorSDs, txid);
+      Thread saveThread = new Thread(saver, saver.toString());
+      saveThreads.add(saveThread);
+      saveThread.start();
     }
     waitForThreads(saveThreads);
     saveThreads.clear();
+    storage.reportErrorsOnDirectories(errorSDs);
 
-    // -NOTE-
-    // If NN has image-only and edits-only storage directories and fails here
-    // the image will have the latest namespace state.
-    // During startup the image-only directories will recover by discarding
-    // lastcheckpoint.tmp, while
-    // the edits-only directories will recover by falling back
-    // to the old state contained in their lastcheckpoint.tmp.
-    // The edits directories should be discarded during startup because their
-    // checkpointTime is older than that of image directories.
-    // recreate edits in current
-    for (Iterator<StorageDirectory> it
-           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      if (errorSDs.contains(sd)) {
-        continue;
-      }
-
-      // if this directory already stores the image and edits, then it was
-      // already processed in the earlier loop.
-      if (sd.getStorageDirType() == NameNodeDirType.IMAGE_AND_EDITS) {
-        continue;
-      }
-
-      try {
-        FSImageSaver saver = new FSImageSaver(sd, errorSDs);
-        Thread saveThread = new Thread(saver, saver.toString());
-        saveThreads.add(saveThread);
-        saveThread.start();
-      } catch (Exception e) {
-        LOG.error("Failed save to edits directory " + sd.getRoot(), e);
-        errorSDs.add(sd);
-        continue;
-      }
+    if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
+      throw new IOException(
+        "Failed to save in any storage directories while saving namespace.");
     }
-    waitForThreads(saveThreads);
 
-    // mv lastcheckpoint.tmp -> previous.checkpoint
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      if (errorSDs.contains(sd)) {
-        continue;
-      }
-      try {
-        storage.moveLastCheckpoint(sd);
-      } catch(IOException ie) {
-        LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie);
-        errorSDs.add(sd);
-        continue;
-      }
-    }
+    renameCheckpoint(txid);
     
-    try {
-      storage.reportErrorsOnDirectories(errorSDs);
-      
-      // If there was an error in every storage dir, each one will have been
-      // removed from the list of storage directories.
-      if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0 ||
-          storage.getNumStorageDirs(NameNodeDirType.EDITS) == 0) {
-        throw new IOException("Failed to save any storage directories while saving namespace");
-      }
-      
-      if(!editLog.isOpen()) editLog.open();
-    } finally {
-      ckptState = CheckpointStates.UPLOAD_DONE;
-    }
+    // Since we now have a new checkpoint, we can clean up some
+    // old edit logs and checkpoints.
+    purgeOldStorage();
   }
 
   /**
-   * Save current image and empty journal into {@code current} directory.
+   * Purge any files in the storage directories that are no longer
+   * necessary.
    */
-  protected void saveCurrent(StorageDirectory sd) throws IOException {
-    if (storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
-      throw new IllegalStateException(
-        "NN with storage version " + FSConstants.LAYOUT_VERSION  +
-        "cannot save an image with version " + storage.getLayoutVersion());
-    }
-    File curDir = sd.getCurrentDir();
-    NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
-    // save new image or new edits
-    if (!curDir.exists() && !curDir.mkdir())
-      throw new IOException("Cannot create directory " + curDir);
-    if (dirType.isOfType(NameNodeDirType.IMAGE))
-      saveFSImage(NNStorage.getStorageFile(sd, NameNodeFile.IMAGE));
-    if (dirType.isOfType(NameNodeDirType.EDITS))
-      editLog.createEditLogFile(NNStorage.getStorageFile(sd,
-                                                         NameNodeFile.EDITS));
-    // write version and time files
-    storage.writeProperties(sd);
-  }
-
-
-  /**
-   * Moves fsimage.ckpt to fsImage and edits.new to edits
-   * Reopens the new edits file.
-   */
-  void rollFSImage(CheckpointSignature sig, 
-      boolean renewCheckpointTime) throws IOException {
-    sig.validateStorageInfo(this);
-    rollFSImage(true);
-  }
-
-  private void rollFSImage(boolean renewCheckpointTime)
-  throws IOException {
-    if (ckptState != CheckpointStates.UPLOAD_DONE
-      && !(ckptState == CheckpointStates.ROLLED_EDITS
-      && storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) {
-      throw new IOException("Cannot roll fsImage before rolling edits log.");
-    }
-
-    for (Iterator<StorageDirectory> it 
-           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW);
-      if (!ckpt.exists()) {
-        throw new IOException("Checkpoint file " + ckpt +
-                              " does not exist");
-      }
-    }
-    editLog.purgeEditLog(); // renamed edits.new to edits
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("rollFSImage after purgeEditLog: storageList=" 
-                + storage.listStorageDirectories());
+  public void purgeOldStorage() {
+    try {
+      archivalManager.purgeOldStorage();
+    } catch (Exception e) {
+      LOG.warn("Unable to purge old storage", e);
     }
-    //
-    // Renames new image
-    //
-    renameCheckpoint();
-    resetVersion(renewCheckpointTime, newImageDigest);
   }
 
   /**
    * Renames new image
    */
-  void renameCheckpoint() throws IOException {
+  private void renameCheckpoint(long txid) throws IOException {
     ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it 
-           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW);
-      File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE);
-      // renameTo fails on Windows if the destination file 
-      // already exists.
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("renaming  " + ckpt.getAbsolutePath() 
-                  + " to " + curFile.getAbsolutePath());
-      }
-      if (!ckpt.renameTo(curFile)) {
-        if (!curFile.delete() || !ckpt.renameTo(curFile)) {
-          LOG.warn("renaming  " + ckpt.getAbsolutePath() + " to "  + 
-              curFile.getAbsolutePath() + " FAILED");
 
-          if(al == null) al = new ArrayList<StorageDirectory> (1);
-          al.add(sd);
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
+      try {
+        renameCheckpointInDir(sd, txid);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to rename checkpoint in " + sd, ioe);
+        if (al == null) {
+          al = Lists.newArrayList();
         }
+        al.add(sd);
       }
     }
     if(al != null) storage.reportErrorsOnDirectories(al);
   }
 
-  /**
-   * Updates version and fstime files in all directories (fsimage and edits).
-   */
-  void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) 
+  private void renameCheckpointInDir(StorageDirectory sd, long txid)
       throws IOException {
-    storage.layoutVersion = FSConstants.LAYOUT_VERSION;
-    if(renewCheckpointTime)
-      storage.setCheckpointTime(now());
-    storage.setImageDigest(newImageDigest);
-    
-    ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      // delete old edits if sd is the image only the directory
-      if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File editsFile = NNStorage.getStorageFile(sd, NameNodeFile.EDITS);
-        if(editsFile.exists() && !editsFile.delete())
-          throw new IOException("Cannot delete edits file " 
-                                + editsFile.getCanonicalPath());
-      }
-      // delete old fsimage if sd is the edits only the directory
-      if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imageFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE);
-        if(imageFile.exists() && !imageFile.delete())
-          throw new IOException("Cannot delete image file " 
-                                + imageFile.getCanonicalPath());
-      }
-      try {
-        storage.writeProperties(sd);
-      } catch (IOException e) {
-        LOG.error("Cannot write file " + sd.getRoot(), e);
-        
-        if(al == null) al = new ArrayList<StorageDirectory> (1);
-        al.add(sd);       
-      }
+    File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
+    File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
+    // renameTo fails on Windows if the destination file 
+    // already exists.
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("renaming  " + ckpt.getAbsolutePath() 
+                + " to " + curFile.getAbsolutePath());
     }
-    if(al != null) storage.reportErrorsOnDirectories(al);
-    ckptState = FSImage.CheckpointStates.START;
+    if (!ckpt.renameTo(curFile)) {
+      if (!curFile.delete() || !ckpt.renameTo(curFile)) {
+        throw new IOException("renaming  " + ckpt.getAbsolutePath() + " to "  + 
+            curFile.getAbsolutePath() + " FAILED");
+      }
+    }    
   }
 
   CheckpointSignature rollEditLog() throws IOException {
     getEditLog().rollEditLog();
-    ckptState = CheckpointStates.ROLLED_EDITS;
-    // If checkpoint fails this should be the most recent image, therefore
-    storage.incrementCheckpointTime();
+    // Record this log segment ID in all of the storage directories, so
+    // we won't miss this log segment on a restart if the edits directories
+    // go missing.
+    storage.writeTransactionIdFileToStorage(getEditLog().getCurSegmentTxId());
     return new CheckpointSignature(this);
   }
 
   /**
-   * This is called just before a new checkpoint is uploaded to the
-   * namenode.
-   */
-  void validateCheckpointUpload(CheckpointSignature sig) throws IOException {
-    if (ckptState != CheckpointStates.ROLLED_EDITS) {
-      throw new IOException("Namenode is not expecting an new image " +
-                             ckptState);
-    } 
-    // verify token
-    long modtime = getEditLog().getFsEditTime();
-    if (sig.editsTime != modtime) {
-      throw new IOException("Namenode has an edit log with timestamp of " +
-                            DATE_FORM.format(new Date(modtime)) +
-                            " but new checkpoint was created using editlog " +
-                            " with timestamp " + 
-                            DATE_FORM.format(new Date(sig.editsTime)) + 
-                            ". Checkpoint Aborted.");
-    }
-    sig.validateStorageInfo(this);
-    ckptState = FSImage.CheckpointStates.UPLOAD_START;
-  }
-
-  /**
    * Start checkpoint.
    * <p>
    * If backup storage contains image that is newer than or incompatible with 
@@ -1150,34 +945,23 @@ public class FSImage implements NNStorag
             + " role " + bnReg.getRole() + ": checkpoint is not allowed.";
     else if(bnReg.getLayoutVersion() < storage.getLayoutVersion()
         || (bnReg.getLayoutVersion() == storage.getLayoutVersion()
-            && bnReg.getCTime() > storage.getCTime())
-        || (bnReg.getLayoutVersion() == storage.getLayoutVersion()
-            && bnReg.getCTime() == storage.getCTime()
-            && bnReg.getCheckpointTime() > storage.getCheckpointTime()))
+            && bnReg.getCTime() > storage.getCTime()))
       // remote node has newer image age
       msg = "Name node " + bnReg.getAddress()
             + " has newer image layout version: LV = " +bnReg.getLayoutVersion()
             + " cTime = " + bnReg.getCTime()
-            + " checkpointTime = " + bnReg.getCheckpointTime()
             + ". Current version: LV = " + storage.getLayoutVersion()
-            + " cTime = " + storage.getCTime()
-            + " checkpointTime = " + storage.getCheckpointTime();
+            + " cTime = " + storage.getCTime();
     if(msg != null) {
       LOG.error(msg);
       return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN);
     }
-    boolean isImgObsolete = true;
-    if(bnReg.getLayoutVersion() == storage.getLayoutVersion()
-        && bnReg.getCTime() == storage.getCTime()
-        && bnReg.getCheckpointTime() == storage.getCheckpointTime())
-      isImgObsolete = false;
     boolean needToReturnImg = true;
     if(storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0)
       // do not return image if there are no image directories
       needToReturnImg = false;
     CheckpointSignature sig = rollEditLog();
-    getEditLog().logJSpoolStart(bnReg, nnReg);
-    return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
+    return new CheckpointCommand(sig, needToReturnImg);
   }
 
   /**
@@ -1196,32 +980,41 @@ public class FSImage implements NNStorag
   void endCheckpoint(CheckpointSignature sig,
                      NamenodeRole remoteNNRole) throws IOException {
     sig.validateStorageInfo(this);
-    // Renew checkpoint time for the active if the other is a checkpoint-node.
-    // The checkpoint-node should have older image for the next checkpoint 
-    // to take effect.
-    // The backup-node always has up-to-date image and will have the same
-    // checkpoint time as the active node.
-    boolean renewCheckpointTime = remoteNNRole.equals(NamenodeRole.CHECKPOINT);
-    rollFSImage(sig, renewCheckpointTime);
-  }
-
-  CheckpointStates getCheckpointState() {
-    return ckptState;
-  }
-
-  void setCheckpointState(CheckpointStates cs) {
-    ckptState = cs;
   }
 
   /**
-   * This is called when a checkpoint upload finishes successfully.
-   */
-  synchronized void checkpointUploadDone() {
-    ckptState = CheckpointStates.UPLOAD_DONE;
+   * This is called by the 2NN after having downloaded an image, and by
+   * the NN after having received a new image from the 2NN. It
+   * renames the image from fsimage_N.ckpt to fsimage_N and also
+   * saves the related .md5 file into place.
+   */
+  synchronized void saveDigestAndRenameCheckpointImage(
+      long txid, MD5Hash digest) throws IOException {
+    renameCheckpoint(txid);
+    List<StorageDirectory> badSds = Lists.newArrayList();
+    
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
+      File imageFile = NNStorage.getImageFile(sd, txid);
+      try {
+        MD5FileUtils.saveMD5File(imageFile, digest);
+      } catch (IOException ioe) {
+        badSds.add(sd);
+      }
+    }
+    storage.reportErrorsOnDirectories(badSds);
+    
+    // So long as this is the newest image available,
+    // advertise it as such to other checkpointers
+    // from now on
+    if (txid > storage.getMostRecentCheckpointTxId()) {
+      storage.setMostRecentCheckpointTxId(txid);
+    }
   }
 
   synchronized public void close() throws IOException {
-    getEditLog().close();
+    if (editLog != null) { // 2NN doesn't have any edit log
+      getEditLog().close();
+    }
     storage.close();
   }
 
@@ -1257,30 +1050,6 @@ public class FSImage implements NNStorag
     return storage;
   }
 
-  @Override // NNStorageListener
-  public void errorOccurred(StorageDirectory sd) throws IOException {
-    // do nothing,
-  }
-
-  @Override // NNStorageListener
-  public void formatOccurred(StorageDirectory sd) throws IOException {
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-      sd.lock();
-      try {
-        saveCurrent(sd);
-      } finally {
-        sd.unlock();
-      }
-      LOG.info("Storage directory " + sd.getRoot()
-               + " has been successfully formatted.");
-    }
-  };
-
-  @Override // NNStorageListener
-  public void directoryAvailable(StorageDirectory sd) throws IOException {
-    // do nothing
-  }
-
   public int getLayoutVersion() {
     return storage.getLayoutVersion();
   }
@@ -1296,4 +1065,8 @@ public class FSImage implements NNStorag
   public String getBlockPoolID() {
     return storage.getBlockPoolID();
   }
+
+  public synchronized long getLastAppliedTxId() {
+    return lastAppliedTxId;
+  }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Jul 29 16:28:45 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 
@@ -71,10 +72,8 @@ class FSImageFormat {
     /** Set to true once a file has been loaded using this loader. */
     private boolean loaded = false;
 
-    /** The image version of the loaded file */
-    private int imgVersion;
-    /** The namespace ID of the loaded file */
-    private int imgNamespaceID;
+    /** The transaction ID of the last edit represented by the loaded file */
+    private long imgTxId;
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
 
@@ -84,15 +83,6 @@ class FSImageFormat {
     }
 
     /**
-     * Return the version number of the image that has been loaded.
-     * @throws IllegalStateException if load() has not yet been called.
-     */
-    int getLoadedImageVersion() {
-      checkLoaded();
-      return imgVersion;
-    }
-    
-    /**
      * Return the MD5 checksum of the image that has been loaded.
      * @throws IllegalStateException if load() has not yet been called.
      */
@@ -101,13 +91,9 @@ class FSImageFormat {
       return imgDigest;
     }
 
-    /**
-     * Return the namespace ID of the image that has been loaded.
-     * @throws IllegalStateException if load() has not yet been called.
-     */
-    int getLoadedNamespaceID() {
+    long getLoadedImageTxId() {
       checkLoaded();
-      return imgNamespaceID;
+      return imgTxId;
     }
 
     /**
@@ -156,10 +142,14 @@ class FSImageFormat {
          * it should not contain version and namespace fields
          */
         // read image version: first appeared in version -1
-        imgVersion = in.readInt();
+        int imgVersion = in.readInt();
+        if(getLayoutVersion() != imgVersion)
+          throw new InconsistentFSStateException(curFile, 
+              "imgVersion " + imgVersion +
+              " expected to be " + getLayoutVersion());
 
         // read namespaceID: first appeared in version -2
-        imgNamespaceID = in.readInt();
+        in.readInt();
 
         // read number of files
         long numFiles = readNumFiles(in);
@@ -169,6 +159,15 @@ class FSImageFormat {
           long genstamp = in.readLong();
           namesystem.setGenerationStamp(genstamp); 
         }
+        
+        // read the transaction ID of the last edit represented by
+        // this image
+        if (LayoutVersion.supports(Feature.STORED_TXIDS, imgVersion)) {
+          imgTxId = in.readLong();
+        } else {
+          imgTxId = 0;
+        }
+        
 
         // read compression related info
         FSImageCompression compression;
@@ -234,7 +233,7 @@ class FSImageFormat {
    private void loadLocalNameINodes(long numFiles, DataInputStream in) 
    throws IOException {
      assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
-         imgVersion);
+         getLayoutVersion());
      assert numFiles > 0;
 
      // load root
@@ -324,11 +323,12 @@ class FSImageFormat {
    * @return an inode
    */
   private INode loadINode(DataInputStream in)
-  throws IOException {
+      throws IOException {
     long modificationTime = 0;
     long atime = 0;
     long blockSize = 0;
     
+    int imgVersion = getLayoutVersion();
     short replication = in.readShort();
     replication = namesystem.adjustReplication(replication);
     modificationTime = in.readLong();
@@ -396,7 +396,10 @@ class FSImageFormat {
           modificationTime, atime, nsQuota, dsQuota, blockSize);
     }
 
-    private void loadDatanodes(DataInputStream in) throws IOException {
+    private void loadDatanodes(DataInputStream in)
+        throws IOException {
+      int imgVersion = getLayoutVersion();
+
       if (imgVersion > -3) // pre datanode image version
         return;
       if (imgVersion <= -12) {
@@ -412,6 +415,7 @@ class FSImageFormat {
     private void loadFilesUnderConstruction(DataInputStream in)
     throws IOException {
       FSDirectory fsDir = namesystem.dir;
+      int imgVersion = getLayoutVersion();
       if (imgVersion > -13) // pre lease image version
         return;
       int size = in.readInt();
@@ -437,7 +441,10 @@ class FSImageFormat {
       }
     }
 
-    private void loadSecretManagerState(DataInputStream in) throws IOException {
+    private void loadSecretManagerState(DataInputStream in)
+        throws IOException {
+      int imgVersion = getLayoutVersion();
+
       if (!LayoutVersion.supports(Feature.DELEGATION_TOKEN, imgVersion)) {
         //SecretManagerState is not available.
         //This must not happen if security is turned on.
@@ -446,8 +453,14 @@ class FSImageFormat {
       namesystem.loadSecretManagerState(in);
     }
 
+    private int getLayoutVersion() {
+      return namesystem.getFSImage().getStorage().getLayoutVersion();
+    }
+
+    private long readNumFiles(DataInputStream in)
+        throws IOException {
+      int imgVersion = getLayoutVersion();
 
-    private long readNumFiles(DataInputStream in) throws IOException {
       if (LayoutVersion.supports(Feature.NAMESPACE_QUOTA, imgVersion)) {
         return in.readLong();
       } else {
@@ -526,6 +539,7 @@ class FSImageFormat {
     }
 
     void save(File newFile,
+              long txid,
               FSNamesystem sourceNamesystem,
               FSImageCompression compression)
       throws IOException {
@@ -542,9 +556,11 @@ class FSImageFormat {
       DataOutputStream out = new DataOutputStream(fos);
       try {
         out.writeInt(FSConstants.LAYOUT_VERSION);
-        out.writeInt(sourceNamesystem.getFSImage().getStorage().getNamespaceID()); // TODO bad dependency
+        out.writeInt(sourceNamesystem.getFSImage()
+                     .getStorage().getNamespaceID()); // TODO bad dependency
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
+        out.writeLong(txid);
 
         // write compression info and set up compressed stream
         out = compression.writeHeaderAndWrapStream(fos);

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+
+/**
+ * Inspects a FSImage storage directory in the "old" (pre-HDFS-1073) format.
+ * This format has the following data files:
+ *   - fsimage
+ *   - fsimage.ckpt (when checkpoint is being uploaded)
+ *   - edits
+ *   - edits.new (when logs are "rolled")
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
+  private static final Log LOG =
+    LogFactory.getLog(FSImagePreTransactionalStorageInspector.class);
+  
+  /* Flag if there is at least one storage dir that doesn't contain the newest
+   * fstime */
+  private boolean hasOutOfDateStorageDirs = false;
+  /* Flag set false if there are any "previous" directories found */
+  private boolean isUpgradeFinalized = true;
+  
+  // Track the name and edits dir with the latest times
+  private long latestNameCheckpointTime = Long.MIN_VALUE;
+  private long latestEditsCheckpointTime = Long.MIN_VALUE;
+  private StorageDirectory latestNameSD = null;
+  private StorageDirectory latestEditsSD = null;
+
+  /** Set to determine if all of storageDirectories share the same checkpoint */
+  Set<Long> checkpointTimes = new HashSet<Long>();
+
+  private List<String> imageDirs = new ArrayList<String>();
+  private List<String> editsDirs = new ArrayList<String>();
+  
+  @Override
+  void inspectDirectory(StorageDirectory sd) throws IOException {
+    // Was the file just formatted?
+    if (!sd.getVersionFile().exists()) {
+      hasOutOfDateStorageDirs = true;
+      return;
+    }
+    
+    boolean imageExists = false;
+    boolean editsExists = false;
+    
+    // Determine if sd is image, edits or both
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
+      imageExists = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE).exists();        
+      imageDirs.add(sd.getRoot().getCanonicalPath());
+    }
+    
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      editsExists = NNStorage.getStorageFile(sd, NameNodeFile.EDITS).exists();
+      editsDirs.add(sd.getRoot().getCanonicalPath());
+    }
+    
+    long checkpointTime = readCheckpointTime(sd);
+
+    checkpointTimes.add(checkpointTime);
+    
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && 
+       (latestNameCheckpointTime < checkpointTime) && imageExists) {
+      latestNameCheckpointTime = checkpointTime;
+      latestNameSD = sd;
+    }
+    
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && 
+         (latestEditsCheckpointTime < checkpointTime) && editsExists) {
+      latestEditsCheckpointTime = checkpointTime;
+      latestEditsSD = sd;
+    }
+    
+    // check that we have a valid, non-default checkpointTime
+    if (checkpointTime <= 0L)
+      hasOutOfDateStorageDirs = true;
+    
+    // set finalized flag
+    isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();    
+  }
+
+  /**
+   * Determine the checkpoint time of the specified StorageDirectory
+   *
+   * @param sd StorageDirectory to check
+   * @return If file exists and can be read, last checkpoint time. If not, 0L.
+   * @throws IOException On errors processing file pointed to by sd
+   */
+  static long readCheckpointTime(StorageDirectory sd) throws IOException {
+    File timeFile = NNStorage.getStorageFile(sd, NameNodeFile.TIME);
+    long timeStamp = 0L;
+    if (timeFile.exists() && timeFile.canRead()) {
+      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+      try {
+        timeStamp = in.readLong();
+      } finally {
+        in.close();
+      }
+    }
+    return timeStamp;
+  }
+
+  @Override
+  boolean isUpgradeFinalized() {
+    return isUpgradeFinalized;
+  }
+  
+  @Override
+  LoadPlan createLoadPlan() throws IOException {
+    // We should have at least one image and one edits dirs
+    if (latestNameSD == null)
+      throw new IOException("Image file is not found in " + imageDirs);
+    if (latestEditsSD == null)
+      throw new IOException("Edits file is not found in " + editsDirs);
+
+    // Make sure we are loading image and edits from same checkpoint
+    if (latestNameCheckpointTime > latestEditsCheckpointTime
+        && latestNameSD != latestEditsSD
+        && latestNameSD.getStorageDirType() == NameNodeDirType.IMAGE
+        && latestEditsSD.getStorageDirType() == NameNodeDirType.EDITS) {
+      // This is a rare failure when NN has image-only and edits-only
+      // storage directories, and fails right after saving images,
+      // in some of the storage directories, but before purging edits.
+      // See -NOTE- in saveNamespace().
+      LOG.error("This is a rare failure scenario!!!");
+      LOG.error("Image checkpoint time " + latestNameCheckpointTime +
+                " > edits checkpoint time " + latestEditsCheckpointTime);
+      LOG.error("Name-node will treat the image as the latest state of " +
+                "the namespace. Old edits will be discarded.");
+    } else if (latestNameCheckpointTime != latestEditsCheckpointTime) {
+      throw new IOException("Inconsistent storage detected, " +
+                      "image and edits checkpoint times do not match. " +
+                      "image checkpoint time = " + latestNameCheckpointTime +
+                      "edits checkpoint time = " + latestEditsCheckpointTime);
+    }
+    
+    return new PreTransactionalLoadPlan();
+  }
+  
+  @Override
+  boolean needToSave() {
+    return hasOutOfDateStorageDirs ||
+      checkpointTimes.size() != 1 ||
+      latestNameCheckpointTime > latestEditsCheckpointTime;
+
+  }
+  
+  private class PreTransactionalLoadPlan extends LoadPlan {
+
+    @Override
+    boolean doRecovery() throws IOException {
+      LOG.debug(
+        "Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
+      
+      boolean needToSave = false;
+      File curFile =
+        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+      File ckptFile =
+        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
+
+      //
+      // If we were in the midst of a checkpoint
+      //
+      if (ckptFile.exists()) {
+        needToSave = true;
+        if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
+              .exists()) {
+          //
+          // checkpointing migth have uploaded a new
+          // merged image, but we discard it here because we are
+          // not sure whether the entire merged image was uploaded
+          // before the namenode crashed.
+          //
+          if (!ckptFile.delete()) {
+            throw new IOException("Unable to delete " + ckptFile);
+          }
+        } else {
+          //
+          // checkpointing was in progress when the namenode
+          // shutdown. The fsimage.ckpt was created and the edits.new
+          // file was moved to edits. We complete that checkpoint by
+          // moving fsimage.new to fsimage. There is no need to 
+          // update the fstime file here. renameTo fails on Windows
+          // if the destination file already exists.
+          //
+          if (!ckptFile.renameTo(curFile)) {
+            if (!curFile.delete())
+              LOG.warn("Unable to delete dir " + curFile + " before rename");
+            if (!ckptFile.renameTo(curFile)) {
+              throw new IOException("Unable to rename " + ckptFile +
+                                    " to " + curFile);
+            }
+          }
+        }
+      }
+      return needToSave;
+    }
+
+    @Override
+    File getImageFile() {
+      return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+    }
+
+    @Override
+    List<File> getEditsFiles() {
+      if (latestNameCheckpointTime > latestEditsCheckpointTime) {
+        // the image is already current, discard edits
+        LOG.debug(
+          "Name checkpoint time is newer than edits, not loading edits.");
+        return Collections.<File>emptyList();
+      }
+      
+      return getEditsInStorageDir(latestEditsSD);
+    }
+
+    @Override
+    StorageDirectory getStorageDirectoryForProperties() {
+      return latestNameSD;
+    }    
+  }
+
+  /**
+   * @return a list with the paths to EDITS and EDITS_NEW (if it exists)
+   * in a given storage directory.
+   */
+  static List<File> getEditsInStorageDir(StorageDirectory sd) {
+    ArrayList<File> files = new ArrayList<File>();
+    File edits = NNStorage.getStorageFile(sd, NameNodeFile.EDITS);
+    assert edits.exists() : "Expected edits file at " + edits;
+    files.add(edits);
+    File editsNew = NNStorage.getStorageFile(sd, NameNodeFile.EDITS_NEW);
+    if (editsNew.exists()) {
+      files.add(editsNew);
+    }
+    return files;
+  }
+}

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * Interface responsible for inspecting a set of storage directories and devising
+ * a plan to load the namespace from them.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+abstract class FSImageStorageInspector {
+  /**
+   * Inspect the contents of the given storage directory.
+   */
+  abstract void inspectDirectory(StorageDirectory sd) throws IOException;
+
+  /**
+   * @return false if any of the storage directories have an unfinalized upgrade 
+   */
+  abstract boolean isUpgradeFinalized();
+  
+  /**
+   * Create a plan to load the image from the set of inspected storage directories.
+   * @throws IOException if not enough files are available (eg no image found in any directory)
+   */
+  abstract LoadPlan createLoadPlan() throws IOException;
+  
+  /**
+   * @return true if the directories are in such a state that the image should be re-saved
+   * following the load
+   */
+  abstract boolean needToSave();
+
+  /**
+   * A plan to load the namespace from disk, providing the locations from which to load
+   * the image and a set of edits files.
+   */
+  abstract static class LoadPlan {
+    /**
+     * Execute atomic move sequence in the chosen storage directories,
+     * in order to recover from an interrupted checkpoint.
+     * @return true if some recovery action was taken
+     */
+    abstract boolean doRecovery() throws IOException;
+
+    /**
+     * @return the file from which to load the image data
+     */
+    abstract File getImageFile();
+    
+    /**
+     * @return a list of flies containing edits to replay
+     */
+    abstract List<File> getEditsFiles();
+    
+    /**
+     * @return the storage directory containing the VERSION file that should be
+     * loaded.
+     */
+    abstract StorageDirectory getStorageDirectoryForProperties();
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Will load image file: ").append(getImageFile()).append("\n");
+      sb.append("Will load edits files:").append("\n");
+      for (File f : getEditsFiles()) {
+        sb.append("  ").append(f).append("\n");
+      }
+      sb.append("Will load metadata from: ")
+        .append(getStorageDirectoryForProperties())
+        .append("\n");
+      return sb.toString();
+    }
+  }
+}



Mime
View raw message