hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1126941 [2/3] - in /hadoop/hdfs/branches/yahoo-merge: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/serv...
Date Tue, 24 May 2011 09:03:15 GMT
Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue May 24 09:03:13 2011
@@ -17,17 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.Closeable;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.URI;
-import java.net.UnknownHostException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,10 +31,10 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
 import java.util.Set;
-import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -49,20 +42,21 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
@@ -71,72 +65,23 @@ import org.apache.hadoop.hdfs.DFSConfigK
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSImage extends Storage {
+public class FSImage implements NNStorageListener, Closeable {
+  protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
   private static final SimpleDateFormat DATE_FORM =
-    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  protected String blockpoolID = "";   // id of the block pool
-  
-  static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
-  //
-  // The filenames used for storing the images
-  //
-  enum NameNodeFile {
-    IMAGE     ("fsimage"),
-    TIME      ("fstime"),
-    EDITS     ("edits"),
-    IMAGE_NEW ("fsimage.ckpt"),
-    EDITS_NEW ("edits.new");
-    
-    private String fileName = null;
-    private NameNodeFile(String name) {this.fileName = name;}
-    String getName() {return fileName;}
-  }
+      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
   // checkpoint states
   enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
-  /**
-   * Implementation of StorageDirType specific to namenode storage
-   * A Storage directory could be of type IMAGE which stores only fsimage,
-   * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which 
-   * stores both fsimage and edits.
-   */
-  static enum NameNodeDirType implements StorageDirType {
-    UNDEFINED,
-    IMAGE,
-    EDITS,
-    IMAGE_AND_EDITS;
-    
-    public StorageDirType getStorageDirType() {
-      return this;
-    }
-    
-    public boolean isOfType(StorageDirType type) {
-      if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
-        return true;
-      return this == type;
-    }
-  }
 
   protected FSNamesystem namesystem = null;
-  protected long checkpointTime = -1L;  // The age of the image
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
-  protected MD5Hash imageDigest = null;
   protected MD5Hash newImageDigest = null;
 
-  /**
-   * flag that controls if we try to restore failed storages
-   */
-  private boolean restoreFailedStorage = false;
+  protected NNStorage storage = null;
 
   /**
-   * list of failed (and thus removed) storages
-   */
-  protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>();
-    
-  /**
    * URIs for importing an image from a checkpoint. In the default case,
    * URIs will represent directories. 
    */
@@ -150,8 +95,6 @@ public class FSImage extends Storage {
    */
   volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; 
 
-  private static final Random R = new Random();
-  
   /**
    */
   FSImage() {
@@ -169,16 +112,22 @@ public class FSImage extends Storage {
     if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
         DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
       NameNode.LOG.info("set FSImage.restoreFailedStorage");
-      setRestoreFailedStorage(true);
+      storage.setRestoreFailedStorage(true);
     }
     setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
         FSImage.getCheckpointEditsDirs(conf, null));
   }
 
   private FSImage(FSNamesystem ns) {
-    super(NodeType.NAME_NODE);
     this.conf = new Configuration();
-    this.editLog = new FSEditLog(this);
+    
+    storage = new NNStorage(conf);
+    if (ns != null) {
+      storage.setUpgradeManager(ns.upgradeManager);
+    }
+    storage.registerListener(this);
+
+    this.editLog = new FSEditLog(storage);
     setFSNamesystem(ns);
   }
 
@@ -188,12 +137,11 @@ public class FSImage extends Storage {
   FSImage(Collection<URI> fsDirs, Collection<URI> fsEditsDirs) 
       throws IOException {
     this();
-    setStorageDirectories(fsDirs, fsEditsDirs);
+    storage.setStorageDirectories(fsDirs, fsEditsDirs);
   }
 
   public FSImage(StorageInfo storageInfo, String bpid) {
-    super(NodeType.NAME_NODE, storageInfo);
-    blockpoolID = bpid;
+    storage = new NNStorage(storageInfo, bpid);
   }
 
   /**
@@ -206,7 +154,7 @@ public class FSImage extends Storage {
     ArrayList<URI> editsDirs = new ArrayList<URI>(1);
     dirs.add(imageDir);
     editsDirs.add(imageDir);
-    setStorageDirectories(dirs, editsDirs);
+    storage.setStorageDirectories(dirs, editsDirs);
   }
   
   protected FSNamesystem getFSNamesystem() {
@@ -215,159 +163,17 @@ public class FSImage extends Storage {
 
   void setFSNamesystem(FSNamesystem ns) {
     namesystem = ns;
-  }
-
-  public void setRestoreFailedStorage(boolean val) {
-    LOG.info("set restore failed storage to " + val);
-    restoreFailedStorage=val;
-  }
-  
-  public boolean getRestoreFailedStorage() {
-    return restoreFailedStorage;
-  }
-  
-  void setStorageDirectories(Collection<URI> fsNameDirs,
-                             Collection<URI> fsEditsDirs) throws IOException {
-    this.storageDirs = new ArrayList<StorageDirectory>();
-    this.removedStorageDirs = new ArrayList<StorageDirectory>();
-    
-   // Add all name dirs with appropriate NameNodeDirType 
-    for (URI dirName : fsNameDirs) {
-      checkSchemeConsistency(dirName);
-      boolean isAlsoEdits = false;
-      for (URI editsDirName : fsEditsDirs) {
-        if (editsDirName.compareTo(dirName) == 0) {
-          isAlsoEdits = true;
-          fsEditsDirs.remove(editsDirName);
-          break;
-        }
-      }
-      NameNodeDirType dirType = (isAlsoEdits) ?
-                          NameNodeDirType.IMAGE_AND_EDITS :
-                          NameNodeDirType.IMAGE;
-      // Add to the list of storage directories, only if the 
-      // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) 
-          == 0){
-        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), 
-            dirType));
-      }
-    }
-    
-    // Add edits dirs if they are different from name dirs
-    for (URI dirName : fsEditsDirs) {
-      checkSchemeConsistency(dirName);
-      // Add to the list of storage directories, only if the 
-      // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
-          == 0)
-        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), 
-                    NameNodeDirType.EDITS));
+    if (ns != null) {
+      storage.setUpgradeManager(ns.upgradeManager);
     }
   }
 
-  /* 
-   * Checks the consistency of a URI, in particular if the scheme 
-   * is specified and is supported by a concrete implementation 
-   */
-  static void checkSchemeConsistency(URI u) throws IOException {
-    String scheme = u.getScheme();
-    // the URI should have a proper scheme
-    if(scheme == null)
-      throw new IOException("Undefined scheme for " + u);
-    else {
-      try {
-        // the scheme should be enumerated as JournalType
-        JournalType.valueOf(scheme.toUpperCase());
-      } catch (IllegalArgumentException iae){
-        throw new IOException("Unknown scheme " + scheme + 
-            ". It should correspond to a JournalType enumeration value");
-      }
-    }
-  };
-  
   void setCheckpointDirectories(Collection<URI> dirs,
                                 Collection<URI> editsDirs) {
     checkpointDirs = dirs;
     checkpointEditsDirs = editsDirs;
   }
   
-  static File getImageFile(StorageDirectory sd, NameNodeFile type) {
-    return new File(sd.getCurrentDir(), type.getName());
-  }
-  
-  List<StorageDirectory> getRemovedStorageDirs() {
-    return this.removedStorageDirs;
-  }
-  
-  File getEditFile(StorageDirectory sd) {
-    return getImageFile(sd, NameNodeFile.EDITS);
-  }
-  
-  File getEditNewFile(StorageDirectory sd) {
-    return getImageFile(sd, NameNodeFile.EDITS_NEW);
-  }
-
-  Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
-    ArrayList<File> list = new ArrayList<File>();
-    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
-                                    dirIterator(dirType);
-    for ( ;it.hasNext(); ) {
-      list.add(getImageFile(it.next(), type));
-    }
-    return list;
-  }
-
-  Collection<URI> getDirectories(NameNodeDirType dirType) 
-      throws IOException {
-    ArrayList<URI> list = new ArrayList<URI>();
-    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
-                                    dirIterator(dirType);
-    for ( ;it.hasNext(); ) {
-      StorageDirectory sd = it.next();
-      try {
-        list.add(Util.fileAsURI(sd.getRoot()));
-      } catch (IOException e) {
-        throw new IOException("Exception while processing " +
-            "StorageDirectory " + sd.getRoot(), e);
-      }
-    }
-    return list;
-  }
-
-  /**
-   * Retrieve current directories of type IMAGE
-   * @return Collection of URI representing image directories 
-   * @throws IOException in case of URI processing error
-   */
-  Collection<URI> getImageDirectories() throws IOException {
-    return getDirectories(NameNodeDirType.IMAGE);
-  }
-
-  /**
-   * Retrieve current directories of type EDITS
-   * @return Collection of URI representing edits directories 
-   * @throws IOException in case of URI processing error
-   */
-  Collection<URI> getEditsDirectories() throws IOException {
-    return getDirectories(NameNodeDirType.EDITS);
-  }
-
-  /**
-   * Return number of storage directories of the given type.
-   * @param dirType directory type
-   * @return number of storage directories of type dirType
-   */
-  int getNumStorageDirs(NameNodeDirType dirType) {
-    if(dirType == null)
-      return getNumStorageDirs();
-    Iterator<StorageDirectory> it = dirIterator(dirType);
-    int numDirs = 0;
-    for(; it.hasNext(); it.next())
-      numDirs++;
-    return numDirs;
-  }
-
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
@@ -390,26 +196,25 @@ public class FSImage extends Storage {
     if((dataDirs.size() == 0 || editsDirs.size() == 0) 
                              && startOpt != StartupOption.IMPORT)  
       throw new IOException(
-        "All specified directories are not accessible or do not exist.");
+          "All specified directories are not accessible or do not exist.");
     
     if(startOpt == StartupOption.IMPORT 
         && (checkpointDirs == null || checkpointDirs.isEmpty()))
       throw new IOException("Cannot import image from a checkpoint. "
-                          + "\"dfs.namenode.checkpoint.dir\" is not set." );
+                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
 
     if(startOpt == StartupOption.IMPORT 
         && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
       throw new IOException("Cannot import image from a checkpoint. "
-                          + "\"dfs.namenode.checkpoint.dir\" is not set." );
+                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
     
-    setStorageDirectories(dataDirs, editsDirs);
+    storage.setStorageDirectories(dataDirs, editsDirs);
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     Map<StorageDirectory, StorageState> dataDirStates = 
              new HashMap<StorageDirectory, StorageState>();
     boolean isFormatted = false;
-    for (Iterator<StorageDirectory> it = 
-                      dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState;
       try {
@@ -435,7 +240,7 @@ public class FSImage extends Storage {
         if (startOpt == StartupOption.IMPORT && isFormatted)
           // import of a checkpoint is allowed only into empty image directories
           throw new IOException("Cannot import image from a checkpoint. " 
-              + " NameNode already contains an image in " + sd.getRoot());
+              + " NameNode already contains an image in "+ sd.getRoot());
       } catch (IOException ioe) {
         sd.unlock();
         throw ioe;
@@ -446,42 +251,42 @@ public class FSImage extends Storage {
     if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                      && startOpt != StartupOption.IMPORT)
       throw new IOException("NameNode is not formatted.");
-    if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {
-      checkVersionUpgradable(layoutVersion);
+    if (storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
+      NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
     }
     if (startOpt != StartupOption.UPGRADE
-          && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
-          && layoutVersion != FSConstants.LAYOUT_VERSION) {
-        throw new IOException(
-          "\nFile system image contains an old layout version " + layoutVersion
-              + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
-              + " is required.\nPlease restart NameNode with -upgrade option.");
+        && storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
+        && storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
+      throw new IOException(
+          "\nFile system image contains an old layout version " 
+          + storage.getLayoutVersion() + ".\nAn upgrade to version "
+          + FSConstants.LAYOUT_VERSION + " is required.\n"
+          + "Please restart NameNode with -upgrade option.");
     }
     
     // Upgrade to federation requires -upgrade -clusterid <clusterID> option
     if (startOpt == StartupOption.UPGRADE
-        && layoutVersion > LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+        && storage.getLayoutVersion() > Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
       if (startOpt.getClusterId() == null) {
         throw new IOException(
             "\nFile system image contains an old layout version "
-                + layoutVersion + ".\nAn upgrade to version "
+                + storage.getLayoutVersion() + ".\nAn upgrade to version "
                 + FSConstants.LAYOUT_VERSION
                 + " is required.\nPlease restart NameNode with "
                 + "-upgrade -clusterid <clusterID> option.");
       }
-      clusterID = startOpt.getClusterId();
+      storage.setClusterID(startOpt.getClusterId());
       
       // Create new block pool Id
-      blockpoolID = newBlockPoolID();
+      storage.setBlockPoolID(storage.newBlockPoolID());
     }
     
     // check whether distributed upgrade is reguired and/or should be continued
-    verifyDistributedUpgradeProgress(startOpt);
+    storage.verifyDistributedUpgradeProgress(startOpt);
 
     // 2. Format unformatted dirs.
-    this.checkpointTime = 0L;
-    for (Iterator<StorageDirectory> it = 
-                     dirIterator(); it.hasNext();) {
+    storage.setCheckpointTime(0L);
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState = dataDirStates.get(sd);
       switch(curState) {
@@ -523,41 +328,39 @@ public class FSImage extends Storage {
   }
 
   private void doUpgrade() throws IOException {
-    if(getDistributedUpgradeState()) {
+    if(storage.getDistributedUpgradeState()) {
       // only distributed upgrade need to continue
       // don't do version upgrade
       this.loadFSImage();
-      initializeDistributedUpgrade();
+      storage.initializeDistributedUpgrade();
       return;
     }
     // Upgrade is allowed only if there are 
     // no previous fs states in any of the directories
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       if (sd.getPreviousDir().exists())
         throw new InconsistentFSStateException(sd.getRoot(),
-                                               "previous fs state should not exist during upgrade. "
-                                               + "Finalize or rollback first.");
+            "previous fs state should not exist during upgrade. "
+            + "Finalize or rollback first.");
     }
 
     // load the latest image
     this.loadFSImage();
 
     // Do upgrade for each directory
-    long oldCTime = this.getCTime();
-    this.cTime = now();  // generate new cTime for the state
-    int oldLV = this.getLayoutVersion();
-    this.layoutVersion = FSConstants.LAYOUT_VERSION;
-    this.checkpointTime = now();
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
+    long oldCTime = storage.getCTime();
+    storage.cTime = now();  // generate new cTime for the state
+    int oldLV = storage.getLayoutVersion();
+    storage.layoutVersion = FSConstants.LAYOUT_VERSION;
+    storage.setCheckpointTime(now());
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       LOG.info("Upgrading image directory " + sd.getRoot()
                + ".\n   old LV = " + oldLV
                + "; old CTime = " + oldCTime
-               + ".\n   new LV = " + this.getLayoutVersion()
-               + "; new CTime = " + this.getCTime());
+               + ".\n   new LV = " + storage.getLayoutVersion()
+               + "; new CTime = " + storage.getCTime());
       File curDir = sd.getCurrentDir();
       File prevDir = sd.getPreviousDir();
       File tmpDir = sd.getPreviousTmp();
@@ -566,15 +369,15 @@ public class FSImage extends Storage {
       assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
       assert !editLog.isOpen() : "Edits log must not be open.";
       // rename current to tmp
-      rename(curDir, tmpDir);
+      NNStorage.rename(curDir, tmpDir);
       // save new image
       saveCurrent(sd);
       // rename tmp to previous
-      rename(tmpDir, prevDir);
+      NNStorage.rename(tmpDir, prevDir);
       isUpgradeFinalized = false;
       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
-    initializeDistributedUpgrade();
+    storage.initializeDistributedUpgrade();
     editLog.open();
   }
 
@@ -584,9 +387,8 @@ public class FSImage extends Storage {
     // Directories that don't have previous state do not rollback
     boolean canRollback = false;
     FSImage prevState = new FSImage(getFSNamesystem());
-    prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(); it.hasNext();) {
+    prevState.getStorage().layoutVersion = FSConstants.LAYOUT_VERSION;
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       File prevDir = sd.getPreviousDir();
       if (!prevDir.exists()) {  // use current directory then
@@ -595,42 +397,44 @@ public class FSImage extends Storage {
         sd.read(); // read and verify consistency with other directories
         continue;
       }
-      StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
-      sdPrev.read(sdPrev.getPreviousVersionFile());  // read and verify consistency of the prev dir
+      StorageDirectory sdPrev 
+        = prevState.getStorage().new StorageDirectory(sd.getRoot());
+
+      // read and verify consistency of the prev dir
+      sdPrev.read(sdPrev.getPreviousVersionFile());
       canRollback = true;
     }
     if (!canRollback)
-      throw new IOException("Cannot rollback. " 
-                            + "None of the storage directories contain previous fs state.");
+      throw new IOException("Cannot rollback. None of the storage "
+                            + "directories contain previous fs state.");
 
     // Now that we know all directories are going to be consistent
     // Do rollback for each directory containing previous state
-    for (Iterator<StorageDirectory> it = 
-                          dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       File prevDir = sd.getPreviousDir();
       if (!prevDir.exists())
         continue;
 
       LOG.info("Rolling back storage directory " + sd.getRoot()
-               + ".\n   new LV = " + prevState.getLayoutVersion()
-               + "; new CTime = " + prevState.getCTime());
+               + ".\n   new LV = " + prevState.getStorage().getLayoutVersion()
+               + "; new CTime = " + prevState.getStorage().getCTime());
       File tmpDir = sd.getRemovedTmp();
       assert !tmpDir.exists() : "removed.tmp directory must not exist.";
       // rename current to tmp
       File curDir = sd.getCurrentDir();
       assert curDir.exists() : "Current directory must exist.";
-      rename(curDir, tmpDir);
+      NNStorage.rename(curDir, tmpDir);
       // rename previous to current
-      rename(prevDir, curDir);
+      NNStorage.rename(prevDir, curDir);
 
       // delete tmp dir
-      deleteDir(tmpDir);
+      NNStorage.deleteDir(tmpDir);
       LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
     }
     isUpgradeFinalized = true;
     // check whether name-node can start in regular mode
-    verifyDistributedUpgradeProgress(StartupOption.REGULAR);
+    storage.verifyDistributedUpgradeProgress(StartupOption.REGULAR);
   }
 
   private void doFinalize(StorageDirectory sd) throws IOException {
@@ -642,14 +446,14 @@ public class FSImage extends Storage {
     }
     LOG.info("Finalizing upgrade for storage directory " 
              + sd.getRoot() + "."
-             + (getLayoutVersion()==0 ? "" :
-                   "\n   cur LV = " + this.getLayoutVersion()
-                   + "; cur CTime = " + this.getCTime()));
+             + (storage.getLayoutVersion()==0 ? "" :
+                   "\n   cur LV = " + storage.getLayoutVersion()
+                   + "; cur CTime = " + storage.getCTime()));
     assert sd.getCurrentDir().exists() : "Current directory must exist.";
     final File tmpDir = sd.getFinalizedTmp();
     // rename previous to tmp and remove
-    rename(prevDir, tmpDir);
-    deleteDir(tmpDir);
+    NNStorage.rename(prevDir, tmpDir);
+    NNStorage.deleteDir(tmpDir);
     isUpgradeFinalized = true;
     LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
   }
@@ -673,18 +477,18 @@ public class FSImage extends Storage {
       ckptImage.close();
     }
     // return back the real image
-    realImage.setStorageInfo(ckptImage);
-    checkpointTime = ckptImage.checkpointTime;
+    realImage.getStorage().setStorageInfo(ckptImage.getStorage());
+    storage.setCheckpointTime(ckptImage.getStorage().getCheckpointTime());
     fsNamesys.dir.fsImage = realImage;
-    realImage.blockpoolID = ckptImage.blockpoolID;
+    realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
     // and save it but keep the same checkpointTime
     saveNamespace(false);
   }
 
   void finalizeUpgrade() throws IOException {
-    for (Iterator<StorageDirectory> it = 
-                          dirIterator(); it.hasNext();) {
-      doFinalize(it.next());
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      doFinalize(sd);
     }
   }
 
@@ -692,242 +496,10 @@ public class FSImage extends Storage {
     return isUpgradeFinalized;
   }
 
-  /** Validate and set block pool ID */
-  private void setBlockPoolID(File storage, String bpid)
-      throws InconsistentFSStateException {
-    if (bpid == null || bpid.equals("")) {
-      throw new InconsistentFSStateException(storage, "file "
-          + STORAGE_FILE_VERSION + " has no block pool Id.");
-    }
-    
-    if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
-      throw new InconsistentFSStateException(storage,
-          "Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
-    }
-    blockpoolID = bpid;
-  }
-  
-  @Override
-  protected void getFields(Properties props, 
-                           StorageDirectory sd 
-                           ) throws IOException {
-    super.getFields(props, sd);
-    if (layoutVersion == 0) {
-      throw new IOException("NameNode directory " + sd.getRoot()
-          + " is not formatted.");
-    }
-    
-    // No Block pool ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
-      String sbpid = props.getProperty("blockpoolID");
-      setBlockPoolID(sd.getRoot(), sbpid);
-    }
-    
-    String sDUS, sDUV;
-    sDUS = props.getProperty("distributedUpgradeState"); 
-    sDUV = props.getProperty("distributedUpgradeVersion");
-    setDistributedUpgradeState(
-        sDUS == null? false : Boolean.parseBoolean(sDUS),
-        sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
-    
-    String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
-    if (layoutVersion <= -26) {
-      if (sMd5 == null) {
-        throw new InconsistentFSStateException(sd.getRoot(),
-            "file " + STORAGE_FILE_VERSION + " does not have MD5 image digest.");
-      }
-      this.imageDigest = new MD5Hash(sMd5);
-    } else if (sMd5 != null) {
-      throw new InconsistentFSStateException(sd.getRoot(),
-          "file " + STORAGE_FILE_VERSION +
-          " has image MD5 digest when version is " + layoutVersion);
-    }
-
-    this.checkpointTime = readCheckpointTime(sd);
-  }
-
-  /**
-   * Determine the checkpoint time of the specified StorageDirectory
-   * 
-   * @param sd StorageDirectory to check
-   * @return If file exists and can be read, last checkpoint time. If not, 0L.
-   * @throws IOException On errors processing file pointed to by sd
-   */
-  long readCheckpointTime(StorageDirectory sd) throws IOException {
-    File timeFile = getImageFile(sd, NameNodeFile.TIME);
-    long timeStamp = 0L;
-    if (timeFile.exists() && timeFile.canRead()) {
-      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
-      try {
-        timeStamp = in.readLong();
-      } finally {
-        in.close();
-      }
-    }
-    return timeStamp;
-  }
-
-  /**
-   * Write last checkpoint time and version file into the storage directory.
-   * 
-   * The version file should always be written last.
-   * Missing or corrupted version file indicates that 
-   * the checkpoint is not valid.
-   * 
-   * @param sd storage directory
-   * @throws IOException
-   */
-  @Override
-  protected void setFields(Properties props, 
-                           StorageDirectory sd 
-                           ) throws IOException {
-    super.setFields(props, sd);
-    // Set blockpoolID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
-      props.setProperty("blockpoolID", blockpoolID);
-    }
-    boolean uState = getDistributedUpgradeState();
-    int uVersion = getDistributedUpgradeVersion();
-    if(uState && uVersion != getLayoutVersion()) {
-      props.setProperty("distributedUpgradeState", Boolean.toString(uState));
-      props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); 
-    }
-    if (imageDigest == null) {
-      imageDigest = MD5Hash.digest(
-          new FileInputStream(getImageFile(sd, NameNodeFile.IMAGE)));
-    }
-    props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
-
-    writeCheckpointTime(sd);
-  }
-
-  /**
-   * Write last checkpoint time into a separate file.
-   * 
-   * @param sd
-   * @throws IOException
-   */
-  void writeCheckpointTime(StorageDirectory sd) throws IOException {
-    if (checkpointTime < 0L)
-      return; // do not write negative time
-    File timeFile = getImageFile(sd, NameNodeFile.TIME);
-    if (timeFile.exists() && ! timeFile.delete()) {
-        LOG.error("Cannot delete chekpoint time file: "
-                  + timeFile.getCanonicalPath());
-    }
-    FileOutputStream fos = new FileOutputStream(timeFile);
-    DataOutputStream out = new DataOutputStream(fos);
-    try {
-      out.writeLong(checkpointTime);
-      out.flush();
-      fos.getChannel().force(true);
-    } finally {
-      out.close();
-    }
-  }
-
-  /**
-   * Record new checkpoint time in order to
-   * distinguish healthy directories from the removed ones.
-   * If there is an error writing new checkpoint time, the corresponding
-   * storage directory is removed from the list.
-   */
-  void incrementCheckpointTime() {
-    setCheckpointTime(checkpointTime + 1);
-  }
-
-  /**
-   * The age of the namespace state.<p>
-   * Reflects the latest time the image was saved.
-   * Modified with every save or a checkpoint.
-   * Persisted in VERSION file.
-   */
-  long getCheckpointTime() {
-    return checkpointTime;
-  }
-
-  void setCheckpointTime(long newCpT) {
-    checkpointTime = newCpT;
-    // Write new checkpoint time in all storage directories
-    for(Iterator<StorageDirectory> it =
-                          dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      try {
-        writeCheckpointTime(sd);
-      } catch(IOException e) {
-        // Close any edits stream associated with this dir and remove directory
-        LOG.warn("incrementCheckpointTime failed on " + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
-      }
-    }
-  }
-
-  /**
-   * @param sds - array of SDs to process
-   * @param propagate - flag, if set - then call corresponding EditLog stream's 
-   * processIOError function.
-   */
-  void processIOError(List<StorageDirectory> sds, boolean propagate) {
-    ArrayList<EditLogOutputStream> al = null;
-    synchronized (sds) {
-      for (StorageDirectory sd : sds) {
-        // if has a stream assosiated with it - remove it too..
-        if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-          EditLogOutputStream eStream = editLog.getEditsStream(sd);
-          if (al == null)
-            al = new ArrayList<EditLogOutputStream>(1);
-          al.add(eStream);
-        }
-
-        for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
-          StorageDirectory sd1 = it.next();
-          if (sd.equals(sd1)) {
-            // add storage to the removed list
-            LOG.warn("FSImage:processIOError: removing storage: "
-                + sd.getRoot().getPath());
-            try {
-              sd1.unlock(); // unlock before removing (in case it will be
-                            // restored)
-            } catch (Exception e) {
-              LOG.info("Unable to unlock bad storage directory : " +  sd.getRoot().getPath());
-            }
-            removedStorageDirs.add(sd1);
-            it.remove();
-            break;
-          }
-        }
-      }
-    }
-    // if there are some edit log streams to remove    
-    if(propagate && al != null) 
-      editLog.processIOError(al, false);
-    
-    //if called from edits log, the it will call increment from there
-    if(propagate) incrementCheckpointTime(); 
-  }
-
   public FSEditLog getEditLog() {
     return editLog;
   }
 
-  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
-    File oldImageDir = new File(sd.getRoot(), "image");
-    if (!oldImageDir.exists()) {
-      return false;
-    }
-    // check the layout version inside the image file
-    File oldF = new File(oldImageDir, "fsimage");
-    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
-    try {
-      oldFile.seek(0);
-      int odlVersion = oldFile.readInt();
-      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
-        return false;
-    } finally {
-      oldFile.close();
-    }
-    return true;
-  }
-
   //
   // Atomic move sequence, to recover from interrupted checkpoint
   //
@@ -935,15 +507,15 @@ public class FSImage extends Storage {
                                        StorageDirectory editsSD) 
                                        throws IOException {
     boolean needToSave = false;
-    File curFile = getImageFile(nameSD, NameNodeFile.IMAGE);
-    File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW);
+    File curFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE);
+    File ckptFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE_NEW);
 
     //
     // If we were in the midst of a checkpoint
     //
     if (ckptFile.exists()) {
       needToSave = true;
-      if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
+      if (NNStorage.getStorageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
         //
         // checkpointing migth have uploaded a new
         // merged image, but we discard it here because we are
@@ -1007,7 +579,7 @@ public class FSImage extends Storage {
 
     // Process each of the storage directories to find the pair of
     // newest image file and edit file
-    for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
 
       // Was the file just formatted?
@@ -1021,16 +593,16 @@ public class FSImage extends Storage {
       
       // Determine if sd is image, edits or both
       if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        imageExists = getImageFile(sd, NameNodeFile.IMAGE).exists();
+        imageExists = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE).exists();
         imageDirs.add(sd.getRoot().getCanonicalPath());
       }
       
       if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        editsExists = getImageFile(sd, NameNodeFile.EDITS).exists();
+        editsExists = NNStorage.getStorageFile(sd, NameNodeFile.EDITS).exists();
         editsDirs.add(sd.getRoot().getCanonicalPath());
       }
       
-      checkpointTime = readCheckpointTime(sd);
+      long checkpointTime = storage.readCheckpointTime(sd);
 
       checkpointTimes.add(checkpointTime);
       
@@ -1090,7 +662,8 @@ public class FSImage extends Storage {
     // Load in bits
     //
     latestNameSD.read();
-    needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
+    needToSave |= loadFSImage(NNStorage.getStorageFile(latestNameSD,
+                                                       NameNodeFile.IMAGE));
     
     // Load latest edits
     if (latestNameCheckpointTime > latestEditsCheckpointTime)
@@ -1117,16 +690,16 @@ public class FSImage extends Storage {
     // Check that the image digest we loaded matches up with what
     // we expected
     MD5Hash readImageMd5 = loader.getLoadedImageMd5();
-    if (imageDigest == null) {
-      imageDigest = readImageMd5; // set this fsimage's checksum
-    } else if (!imageDigest.equals(readImageMd5)) {
+    if (storage.getImageDigest() == null) {
+      storage.setImageDigest(readImageMd5); // set this fsimage's checksum
+    } else if (!storage.getImageDigest().equals(readImageMd5)) {
       throw new IOException("Image file " + curFile +
           " is corrupt with MD5 checksum of " + readImageMd5 +
-          " but expecting " + imageDigest);
+          " but expecting " + storage.getImageDigest());
     }
 
-    this.namespaceID = loader.getLoadedNamespaceID();
-    this.layoutVersion = loader.getLoadedImageVersion();
+    storage.namespaceID = loader.getLoadedNamespaceID();
+    storage.layoutVersion = loader.getLoadedImageVersion();
 
     boolean needToSave =
       loader.getLoadedImageVersion() != FSConstants.LAYOUT_VERSION;
@@ -1144,12 +717,13 @@ public class FSImage extends Storage {
     FSEditLogLoader loader = new FSEditLogLoader(namesystem);
     
     int numEdits = 0;
-    EditLogFileInputStream edits = 
-      new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
+    EditLogFileInputStream edits =
+      new EditLogFileInputStream(NNStorage.getStorageFile(sd,
+                                                          NameNodeFile.EDITS));
     
     numEdits = loader.loadFSEdits(edits);
     edits.close();
-    File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
+    File editsNew = NNStorage.getStorageFile(sd, NameNodeFile.EDITS_NEW);
     
     if (editsNew.exists() && editsNew.length() > 0) {
       edits = new EditLogFileInputStream(editsNew);
@@ -1170,12 +744,9 @@ public class FSImage extends Storage {
     FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     saver.save(newFile, getFSNamesystem(), compression);
-    setImageDigest(saver.getSavedDigest());
+    storage.setImageDigest(saver.getSavedDigest());
   }
 
-  public void setImageDigest(MD5Hash digest) {
-    this.imageDigest = digest;
-  }
   /**
    * FSImageSaver is being run in a separate thread when saving
    * FSImage. There is one thread per each copy of the image.
@@ -1242,19 +813,19 @@ public class FSImage extends Storage {
  
     // try to restore all failed edit logs here
     assert editLog != null : "editLog must be initialized";
-    attemptRestoreRemovedStorage(true);
+    storage.attemptRestoreRemovedStorage(true);
 
     editLog.close();
     if(renewCheckpointTime)
-      this.checkpointTime = now();
+      storage.setCheckpointTime(now());
     List<StorageDirectory> errorSDs =
       Collections.synchronizedList(new ArrayList<StorageDirectory>());
 
     // mv current -> lastcheckpoint.tmp
-    for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       try {
-        moveCurrent(sd);
+        storage.moveCurrent(sd);
       } catch(IOException ie) {
         LOG.error("Unable to move current for " + sd.getRoot(), ie);
         errorSDs.add(sd);
@@ -1263,8 +834,8 @@ public class FSImage extends Storage {
 
     List<Thread> saveThreads = new ArrayList<Thread>();
     // save images into current
-    for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
-                                                              it.hasNext();) {
+    for (Iterator<StorageDirectory> it
+           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
       FSImageSaver saver = new FSImageSaver(sd, errorSDs);
       Thread saveThread = new Thread(saver, saver.toString());
@@ -1275,7 +846,7 @@ public class FSImage extends Storage {
     saveThreads.clear();
 
     // -NOTE-
-    // If NN has image-only and edits-only storage directories and fails here 
+    // If NN has image-only and edits-only storage directories and fails here
     // the image will have the latest namespace state.
     // During startup the image-only directories will recover by discarding
     // lastcheckpoint.tmp, while
@@ -1284,14 +855,15 @@ public class FSImage extends Storage {
     // The edits directories should be discarded during startup because their
     // checkpointTime is older than that of image directories.
     // recreate edits in current
-    for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
-                                                              it.hasNext();) {
-      final StorageDirectory sd = it.next();
+    for (Iterator<StorageDirectory> it
+           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
       // if this directory already stores the image and edits, then it was
       // already processed in the earlier loop.
       if (sd.getStorageDirType() == NameNodeDirType.IMAGE_AND_EDITS) {
         continue;
       }
+
       FSImageSaver saver = new FSImageSaver(sd, errorSDs);
       Thread saveThread = new Thread(saver, saver.toString());
       saveThreads.add(saveThread);
@@ -1300,16 +872,16 @@ public class FSImage extends Storage {
     waitForThreads(saveThreads);
 
     // mv lastcheckpoint.tmp -> previous.checkpoint
-    for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       try {
-        moveLastCheckpoint(sd);
+        storage.moveLastCheckpoint(sd);
       } catch(IOException ie) {
         LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie);
         errorSDs.add(sd);
       }
     }
-    processIOError(errorSDs, false);
+    storage.reportErrorsOnDirectories(errorSDs);
     if(!editLog.isOpen()) editLog.open();
     ckptState = CheckpointStates.UPLOAD_DONE;
   }
@@ -1324,159 +896,14 @@ public class FSImage extends Storage {
     if (!curDir.exists() && !curDir.mkdir())
       throw new IOException("Cannot create directory " + curDir);
     if (dirType.isOfType(NameNodeDirType.IMAGE))
-      saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
+      saveFSImage(NNStorage.getStorageFile(sd, NameNodeFile.IMAGE));
     if (dirType.isOfType(NameNodeDirType.EDITS))
-      editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+      editLog.createEditLogFile(NNStorage.getStorageFile(sd,
+                                                         NameNodeFile.EDITS));
     // write version and time files
     sd.write();
   }
 
-  /**
-   * Move {@code current} to {@code lastcheckpoint.tmp} and
-   * recreate empty {@code current}.
-   * {@code current} is moved only if it is well formatted,
-   * that is contains VERSION file.
-   * 
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
-   */
-  protected void moveCurrent(StorageDirectory sd)
-  throws IOException {
-    File curDir = sd.getCurrentDir();
-    File tmpCkptDir = sd.getLastCheckpointTmp();
-    // mv current -> lastcheckpoint.tmp
-    // only if current is formatted - has VERSION file
-    if(sd.getVersionFile().exists()) {
-      assert curDir.exists() : curDir + " directory must exist.";
-      assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
-      rename(curDir, tmpCkptDir);
-    }
-    // recreate current
-    if(!curDir.exists() && !curDir.mkdir())
-      throw new IOException("Cannot create directory " + curDir);
-  }
-
-  /**
-   * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
-   * 
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
-   */
-  protected void moveLastCheckpoint(StorageDirectory sd)
-  throws IOException {
-    File tmpCkptDir = sd.getLastCheckpointTmp();
-    File prevCkptDir = sd.getPreviousCheckpoint();
-    // remove previous.checkpoint
-    if (prevCkptDir.exists())
-      deleteDir(prevCkptDir);
-    // mv lastcheckpoint.tmp -> previous.checkpoint
-    if(tmpCkptDir.exists())
-      rename(tmpCkptDir, prevCkptDir);
-  }
-
-  /**
-   * Generate new namespaceID.
-   * 
-   * namespaceID is a persistent attribute of the namespace.
-   * It is generated when the namenode is formatted and remains the same
-   * during the life cycle of the namenode.
-   * When a datanodes register they receive it as the registrationID,
-   * which is checked every time the datanode is communicating with the 
-   * namenode. Datanodes that do not 'know' the namespaceID are rejected.
-   * 
-   * @return new namespaceID
-   */
-  private int newNamespaceID() {
-    Random r = new Random();
-    r.setSeed(now());
-    int newID = 0;
-    while(newID == 0)
-      newID = r.nextInt(0x7FFFFFFF);  // use 31 bits only
-    return newID;
-  }
-
-  /**
-   * Generate new clusterID.
-   * 
-   * clusterID is a persistent attribute of the cluster.
-   * It is generated when the cluster is created and remains the same
-   * during the life cycle of the cluster.  When a new name node is formated, if 
-   * this is a new cluster, a new clusterID is geneated and stored.  Subsequent 
-   * name node must be given the same ClusterID during its format to be in the 
-   * same cluster.
-   * When a datanode register it receive the clusterID and stick with it.
-   * If at any point, name node or data node tries to join another cluster, it 
-   * will be rejected.
-   * 
-   * @return new clusterID
-   */ 
-  public static String newClusterID() {
-    return "CID-" + UUID.randomUUID().toString();
-  }
-  
-  /**
-   * Generate new blockpoolID.
-   * 
-   * @return new blockpoolID
-   */ 
-  private String newBlockPoolID() throws UnknownHostException{
-    String ip = "unknownIP";
-    try {
-      ip = DNS.getDefaultIP("default");
-    } catch (UnknownHostException e) {
-      LOG.warn("Could not find ip address of \"default\" inteface.");
-      throw e;
-    }
-    
-    int rand = 0;
-    try {
-      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
-    } catch (NoSuchAlgorithmException e) {
-      LOG.warn("Could not use SecureRandom");
-      rand = R.nextInt(Integer.MAX_VALUE);
-    }
-    String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
-    return bpid;
-  }
-  
-  /**
-   *  Create new dfs name directory.  Caution: this destroys all files
-   * in this filesystem. 
-   * */
-  void format(StorageDirectory sd) throws IOException {
-    format(sd, true);
-  }
-  
-  /**
-   *  if saveCurrent is true - save current image to the SD 
-   */
-  void format(StorageDirectory sd, boolean saveCurrent) throws IOException {
-    sd.clearDirectory(); // create currrent dir
-    sd.lock();
-    try {
-      if(saveCurrent)
-        saveCurrent(sd);
-    } finally {
-      sd.unlock();
-    }
-    LOG.info("Storage directory " + sd.getRoot()
-             + " has been successfully formatted.");
-  }
-
-  public void format(String clusterId) throws IOException {;
-    this.layoutVersion = FSConstants.LAYOUT_VERSION;
-    this.namespaceID = newNamespaceID();
-    this.clusterID = clusterId;
-    this.blockpoolID = newBlockPoolID();
-    this.cTime = 0L;
-    this.checkpointTime = now();
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      format(sd);
-    }
-  }
-
 
   /**
    * Moves fsimage.ckpt to fsImage and edits.new to edits
@@ -1492,14 +919,14 @@ public class FSImage extends Storage {
   throws IOException {
     if (ckptState != CheckpointStates.UPLOAD_DONE
       && !(ckptState == CheckpointStates.ROLLED_EDITS
-      && getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) {
+      && storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) {
       throw new IOException("Cannot roll fsImage before rolling edits log.");
     }
 
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    for (Iterator<StorageDirectory> it 
+           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
+      File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW);
       if (!ckpt.exists()) {
         throw new IOException("Checkpoint file " + ckpt +
                               " does not exist");
@@ -1507,7 +934,8 @@ public class FSImage extends Storage {
     }
     editLog.purgeEditLog(); // renamed edits.new to edits
     if(LOG.isDebugEnabled()) {
-      LOG.debug("rollFSImage after purgeEditLog: storageList=" + listStorageDirectories());
+      LOG.debug("rollFSImage after purgeEditLog: storageList=" 
+                + storage.listStorageDirectories());
     }
     //
     // Renames new image
@@ -1519,17 +947,18 @@ public class FSImage extends Storage {
   /**
    * Renames new image
    */
-  void renameCheckpoint() {
+  void renameCheckpoint() throws IOException {
     ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it = 
-      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    for (Iterator<StorageDirectory> it 
+           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
-      File curFile = getImageFile(sd, NameNodeFile.IMAGE);
+      File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW);
+      File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE);
       // renameTo fails on Windows if the destination file 
       // already exists.
       if(LOG.isDebugEnabled()) {
-        LOG.debug("renaming  " + ckpt.getAbsolutePath() + " to "  + curFile.getAbsolutePath());
+        LOG.debug("renaming  " + ckpt.getAbsolutePath() 
+                  + " to " + curFile.getAbsolutePath());
       }
       if (!ckpt.renameTo(curFile)) {
         if (!curFile.delete() || !ckpt.renameTo(curFile)) {
@@ -1541,32 +970,32 @@ public class FSImage extends Storage {
         }
       }
     }
-    if(al != null) processIOError(al, true);
+    if(al != null) storage.reportErrorsOnDirectories(al);
   }
 
   /**
    * Updates version and fstime files in all directories (fsimage and edits).
    */
-  void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) throws IOException {
-    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+  void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) 
+      throws IOException {
+    storage.layoutVersion = FSConstants.LAYOUT_VERSION;
     if(renewCheckpointTime)
-      this.checkpointTime = now();
-    this.imageDigest = newImageDigest;
+      storage.setCheckpointTime(now());
+    storage.setImageDigest(newImageDigest);
     
     ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       // delete old edits if sd is the image only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File editsFile = getImageFile(sd, NameNodeFile.EDITS);
+        File editsFile = NNStorage.getStorageFile(sd, NameNodeFile.EDITS);
         if(editsFile.exists() && !editsFile.delete())
           throw new IOException("Cannot delete edits file " 
                                 + editsFile.getCanonicalPath());
       }
       // delete old fsimage if sd is the edits only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imageFile = getImageFile(sd, NameNodeFile.IMAGE);
+        File imageFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE);
         if(imageFile.exists() && !imageFile.delete())
           throw new IOException("Cannot delete image file " 
                                 + imageFile.getCanonicalPath());
@@ -1580,7 +1009,7 @@ public class FSImage extends Storage {
         al.add(sd);       
       }
     }
-    if(al != null) processIOError(al, true);
+    if(al != null) storage.reportErrorsOnDirectories(al);
     ckptState = FSImage.CheckpointStates.START;
   }
 
@@ -1588,7 +1017,7 @@ public class FSImage extends Storage {
     getEditLog().rollEditLog();
     ckptState = CheckpointStates.ROLLED_EDITS;
     // If checkpoint fails this should be the most recent image, therefore
-    incrementCheckpointTime();
+    storage.incrementCheckpointTime();
     return new CheckpointSignature(this);
   }
 
@@ -1636,38 +1065,38 @@ public class FSImage extends Storage {
   throws IOException {
     String msg = null;
     // Verify that checkpoint is allowed
-    if(bnReg.getNamespaceID() != this.getNamespaceID())
+    if(bnReg.getNamespaceID() != storage.getNamespaceID())
       msg = "Name node " + bnReg.getAddress()
             + " has incompatible namespace id: " + bnReg.getNamespaceID()
-            + " expected: " + getNamespaceID();
+            + " expected: " + storage.getNamespaceID();
     else if(bnReg.isRole(NamenodeRole.ACTIVE))
       msg = "Name node " + bnReg.getAddress()
             + " role " + bnReg.getRole() + ": checkpoint is not allowed.";
-    else if(bnReg.getLayoutVersion() < this.getLayoutVersion()
-        || (bnReg.getLayoutVersion() == this.getLayoutVersion()
-            && bnReg.getCTime() > this.getCTime())
-        || (bnReg.getLayoutVersion() == this.getLayoutVersion()
-            && bnReg.getCTime() == this.getCTime()
-            && bnReg.getCheckpointTime() > this.checkpointTime))
+    else if(bnReg.getLayoutVersion() < storage.getLayoutVersion()
+        || (bnReg.getLayoutVersion() == storage.getLayoutVersion()
+            && bnReg.getCTime() > storage.getCTime())
+        || (bnReg.getLayoutVersion() == storage.getLayoutVersion()
+            && bnReg.getCTime() == storage.getCTime()
+            && bnReg.getCheckpointTime() > storage.getCheckpointTime()))
       // remote node has newer image age
       msg = "Name node " + bnReg.getAddress()
             + " has newer image layout version: LV = " +bnReg.getLayoutVersion()
             + " cTime = " + bnReg.getCTime()
             + " checkpointTime = " + bnReg.getCheckpointTime()
-            + ". Current version: LV = " + getLayoutVersion()
-            + " cTime = " + getCTime()
-            + " checkpointTime = " + checkpointTime;
+            + ". Current version: LV = " + storage.getLayoutVersion()
+            + " cTime = " + storage.getCTime()
+            + " checkpointTime = " + storage.getCheckpointTime();
     if(msg != null) {
       LOG.error(msg);
       return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN);
     }
     boolean isImgObsolete = true;
-    if(bnReg.getLayoutVersion() == this.getLayoutVersion()
-        && bnReg.getCTime() == this.getCTime()
-        && bnReg.getCheckpointTime() == this.checkpointTime)
+    if(bnReg.getLayoutVersion() == storage.getLayoutVersion()
+        && bnReg.getCTime() == storage.getCTime()
+        && bnReg.getCheckpointTime() == storage.getCheckpointTime())
       isImgObsolete = false;
     boolean needToReturnImg = true;
-    if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0)
+    if(storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0)
       // do not return image if there are no image directories
       needToReturnImg = false;
     CheckpointSignature sig = rollEditLog();
@@ -1715,169 +1144,15 @@ public class FSImage extends Storage {
     ckptState = CheckpointStates.UPLOAD_DONE;
   }
 
-  synchronized void close() throws IOException {
+  synchronized public void close() throws IOException {
     getEditLog().close();
-    unlockAll();
-  }
-
-  /**
-   * Return the name of the image file.
-   */
-  File getFsImageName() {
-    StorageDirectory sd = null;
-    for (Iterator<StorageDirectory> it = 
-      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      sd = it.next();
-      File fsImage = getImageFile(sd, NameNodeFile.IMAGE);
-      if(sd.getRoot().canRead() && fsImage.exists()) {
-        return fsImage;
-      } 
-    }
-    return null;
-  }
-
-  /**
-   * See if any of removed storages is "writable" again, and can be returned 
-   * into service. If saveNamespace is set, then this methdod is being 
-   * called form saveNamespace.
-   */
-  synchronized void attemptRestoreRemovedStorage(boolean saveNamespace) {   
-    // if directory is "alive" - copy the images there...
-    if(!restoreFailedStorage || removedStorageDirs.size() == 0) 
-      return; //nothing to restore
-    
-    LOG.info("FSImage.attemptRestoreRemovedStorage: check removed(failed) " +
-        "storarge. removedStorages size = " + removedStorageDirs.size());
-    for(Iterator<StorageDirectory> it = this.removedStorageDirs.iterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File root = sd.getRoot();
-      LOG.info("currently disabled dir " + root.getAbsolutePath() + 
-          "; type="+sd.getStorageDirType() + ";canwrite="+root.canWrite());
-      try {
-        
-        if(root.exists() && root.canWrite()) { 
-          /** If this call is being made from savenamespace command, then no
-           * need to format, the savenamespace command will format and write
-           * the new image to this directory anyways.
-           */
-          if (saveNamespace) {
-            sd.clearDirectory();
-          } else {
-            format(sd);
-          }
-          LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
-          if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-            File eFile = getEditFile(sd);
-            editLog.addNewEditLogStream(eFile);
-          }
-          this.addStorageDir(sd); // restore
-          it.remove();
-        }
-      } catch(IOException e) {
-        LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
-      }
-    }    
-  }
-  
-  public File getFsEditName() throws IOException {
-    return getEditLog().getFsEditName();
-  }
-
-  File getFsTimeName() {
-    StorageDirectory sd = null;
-    // NameNodeFile.TIME shoul be same on all directories
-    for (Iterator<StorageDirectory> it = 
-             dirIterator(); it.hasNext();)
-      sd = it.next();
-    return getImageFile(sd, NameNodeFile.TIME);
-  }
-
-  /**
-   * Return the name of the image file that is uploaded by periodic
-   * checkpointing.
-   */
-  File[] getFsImageNameCheckpoint() {
-    ArrayList<File> list = new ArrayList<File>();
-    for (Iterator<StorageDirectory> it = 
-                 dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW));
-    }
-    return list.toArray(new File[list.size()]);
-  }
-  
-  /**
-   * try to find current cluster id in the VERSION files
-   * returns first cluster id found in any VERSION file
-   * null in case none found
-   * @return clusterId or null in case no cluster id found
-   */
-  public String determineClusterId() {
-    String cid = null;
-    Iterator<StorageDirectory> sdit = dirIterator(NameNodeDirType.IMAGE);
-    while(sdit.hasNext()) {
-      StorageDirectory sd = sdit.next();
-      try {
-        Properties props = sd.readFrom(sd.getVersionFile());
-        cid = props.getProperty("clusterID");
-        LOG.info("current cluster id for sd="+sd.getCurrentDir() + 
-            ";lv=" + layoutVersion + ";cid=" + cid);
-        
-        if(cid != null && !cid.equals(""))
-          return cid;
-      } catch (Exception e) {
-        LOG.warn("this sd not available: " + e.getLocalizedMessage());
-      } //ignore
-    }
-    LOG.warn("couldn't find any VERSION file containing valid ClusterId");
-    return null;
-  }
-
-  private boolean getDistributedUpgradeState() {
-    FSNamesystem ns = getFSNamesystem();
-    return ns == null ? false : ns.getDistributedUpgradeState();
+    storage.close();
   }
 
-  private int getDistributedUpgradeVersion() {
-    FSNamesystem ns = getFSNamesystem();
-    return ns == null ? 0 : ns.getDistributedUpgradeVersion();
-  }
-
-  private void setDistributedUpgradeState(boolean uState, int uVersion) {
-    getFSNamesystem().upgradeManager.setUpgradeState(uState, uVersion);
-  }
-
-  private void verifyDistributedUpgradeProgress(StartupOption startOpt
-                                                ) throws IOException {
-    if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
-      return;
-    UpgradeManager um = getFSNamesystem().upgradeManager;
-    assert um != null : "FSNameSystem.upgradeManager is null.";
-    if(startOpt != StartupOption.UPGRADE) {
-      if(um.getUpgradeState())
-        throw new IOException(
-                    "\n   Previous distributed upgrade was not completed. "
-                  + "\n   Please restart NameNode with -upgrade option.");
-      if(um.getDistributedUpgrades() != null)
-        throw new IOException("\n   Distributed upgrade for NameNode version " 
-          + um.getUpgradeVersion() + " to current LV " + FSConstants.LAYOUT_VERSION
-          + " is required.\n   Please restart NameNode with -upgrade option.");
-    }
-  }
-
-  private void initializeDistributedUpgrade() throws IOException {
-    UpgradeManagerNamenode um = getFSNamesystem().upgradeManager;
-    if(! um.initializeUpgrade())
-      return;
-    // write new upgrade state into disk
-    writeAll();
-    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
-        + um.getUpgradeVersion() + " to current LV " 
-        + FSConstants.LAYOUT_VERSION + " is initialized.");
-  }
 
   /**
    * Retrieve checkpoint dirs from configuration.
-   *  
+   *
    * @param conf the Configuration
    * @param defaultValue a default value for the attribute, if null
    * @return a Collection of URIs representing the values in 
@@ -1900,9 +1175,49 @@ public class FSImage extends Storage {
       dirNames.add(defaultName);
     }
     return Util.stringCollectionAsURIs(dirNames);
-  }  
+  }
+
+  public NNStorage getStorage() {
+    return storage;
+  }
+
+  @Override // NNStorageListener
+  public void errorOccurred(StorageDirectory sd) throws IOException {
+    // do nothing,
+  }
 
+  @Override // NNStorageListener
+  public void formatOccurred(StorageDirectory sd) throws IOException {
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
+      sd.lock();
+      try {
+        saveCurrent(sd);
+      } finally {
+        sd.unlock();
+      }
+      LOG.info("Storage directory " + sd.getRoot()
+               + " has been successfully formatted.");
+    }
+  };
+
+  @Override // NNStorageListener
+  public void directoryAvailable(StorageDirectory sd) throws IOException {
+    // do nothing
+  }
+
+  public int getLayoutVersion() {
+    return storage.getLayoutVersion();
+  }
+  
+  public int getNamespaceID() {
+    return storage.getNamespaceID();
+  }
+  
+  public String getClusterID() {
+    return storage.getClusterID();
+  }
+  
   public String getBlockPoolID() {
-    return blockpoolID;
+    return storage.getBlockPoolID();
   }
 }

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue May 24 09:03:13 2011
@@ -469,7 +469,7 @@ class FSImageFormat {
       DataOutputStream out = new DataOutputStream(fos);
       try {
         out.writeInt(FSConstants.LAYOUT_VERSION);
-        out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency
+        out.writeInt(sourceNamesystem.getFSImage().getStorage().getNamespaceID()); // TODO bad dependency
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
 

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue May 24 09:03:13 2011
@@ -450,11 +450,11 @@ public class FSNamesystem implements FSC
    * Should do everything that would be done for the NameNode,
    * except for loading the image.
    * 
-   * @param bnImage {@link BackupStorage}
+   * @param bnImage {@link BackupImage}
    * @param conf configuration
    * @throws IOException
    */
-  FSNamesystem(Configuration conf, BackupStorage bnImage) throws IOException {
+  FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException {
     try {
       initialize(conf, bnImage);
     } catch(IOException e) {
@@ -543,12 +543,11 @@ public class FSNamesystem implements FSC
   }
   
   NamespaceInfo getNamespaceInfo() {
-    NamespaceInfo nsinfo = new NamespaceInfo(dir.fsImage.getNamespaceID(),
+    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
                              getClusterId(),
                              getBlockPoolId(),
-                             dir.fsImage.getCTime(),
+                             dir.fsImage.getStorage().getCTime(),
                              getDistributedUpgradeVersion());
-    return nsinfo;
   }
 
   /**
@@ -2724,7 +2723,7 @@ public class FSNamesystem implements FSC
    * @return registration ID
    */
   public String getRegistrationID() {
-    return Storage.getRegistrationID(dir.fsImage);
+    return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
     
   /**
@@ -3509,10 +3508,10 @@ public class FSNamesystem implements FSC
     
     // if it is disabled - enable it and vice versa.
     if(arg.equals("check"))
-      return getFSImage().getRestoreFailedStorage();
+      return getFSImage().getStorage().getRestoreFailedStorage();
     
     boolean val = arg.equals("true");  // false if not
-    getFSImage().setRestoreFailedStorage(val);
+    getFSImage().getStorage().setRestoreFailedStorage(val);
     
     return val;
     } finally {
@@ -4810,18 +4809,20 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   void registerBackupNode(NamenodeRegistration registration)
-  throws IOException {
+    throws IOException {
     writeLock();
     try {
-    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs: " 
-          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
-          + "; " + registration.getRole() +
-              " node namespaceID = " + registration.getNamespaceID());
-    boolean regAllowed = getEditLog().checkBackupRegistration(registration);
-    if(!regAllowed)
-      throw new IOException("Registration is not allowed. " +
-          "Another node is registered as a backup.");
+      if(getFSImage().getStorage().getNamespaceID() 
+         != registration.getNamespaceID())
+        throw new IOException("Incompatible namespaceIDs: "
+            + " Namenode namespaceID = "
+            + getFSImage().getStorage().getNamespaceID() + "; "
+            + registration.getRole() +
+            " node namespaceID = " + registration.getNamespaceID());
+      boolean regAllowed = getEditLog().checkBackupRegistration(registration);
+      if(!regAllowed)
+        throw new IOException("Registration is not allowed. " +
+                              "Another node is registered as a backup.");
     } finally {
       writeUnlock();
     }
@@ -4835,15 +4836,17 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   void releaseBackupNode(NamenodeRegistration registration)
-  throws IOException {
+    throws IOException {
     writeLock();
     try {
-    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs: " 
-          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
-          + "; " + registration.getRole() +
-              " node namespaceID = " + registration.getNamespaceID());
-    getEditLog().releaseBackupStream(registration);
+      if(getFSImage().getStorage().getNamespaceID()
+         != registration.getNamespaceID())
+        throw new IOException("Incompatible namespaceIDs: "
+            + " Namenode namespaceID = "
+            + getFSImage().getStorage().getNamespaceID() + "; "
+            + registration.getRole() +
+            " node namespaceID = " + registration.getNamespaceID());
+      getEditLog().releaseBackupStream(registration);
     } finally {
       writeUnlock();
     }
@@ -5362,7 +5365,7 @@ public class FSNamesystem implements FSC
 
   @Override  // NameNodeMXBean
   public String getClusterId() {
-    return dir.fsImage.getClusterID();
+    return dir.fsImage.getStorage().getClusterID();
   }
   
   @Override  // NameNodeMXBean

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Tue May 24 09:03:13 2011
@@ -76,17 +76,19 @@ public class GetImageServlet extends Htt
         public Void run() throws Exception {
           if (ff.getImage()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                String.valueOf(nnImage.getFsImageName().length()));
+                               String.valueOf(nnImage.getStorage()
+                                              .getFsImageName().length()));
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsImageName(),
+                                          nnImage.getStorage().getFsImageName(),
                 getThrottler(conf)); 
           } else if (ff.getEdit()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                String.valueOf(nnImage.getFsEditName().length()));
+                               String.valueOf(nnImage.getStorage()
+                                              .getFsEditName().length()));
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsEditName(),
+                                          nnImage.getStorage().getFsEditName(),
                 getThrottler(conf));
           } else if (ff.putImage()) {
             // issue a HTTP get request to download the new fsimage 
@@ -98,7 +100,7 @@ public class GetImageServlet extends Htt
                 public MD5Hash run() throws Exception {
                   return TransferFsImage.getFileClient(
                       ff.getInfoServer(), "getimage=1", 
-                      nnImage.getFsImageNameCheckpoint(), true);
+                      nnImage.getStorage().getFsImageNameCheckpoint(), true);
                 }
             });
             if (!nnImage.newImageDigest.equals(downloadImageDigest)) {

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue May 24 09:03:13 2011
@@ -374,7 +374,7 @@ public class NameNode implements Namenod
     nodeRegistration = new NamenodeRegistration(
         getHostPortString(rpcAddress),
         getHostPortString(httpAddress),
-        getFSImage(), getRole(), getFSImage().getCheckpointTime());
+        getFSImage().getStorage(), getRole(), getFSImage().getStorage().getCheckpointTime());
     return nodeRegistration;
   }
 
@@ -1392,7 +1392,7 @@ public class NameNode implements Namenod
    * Returns the name of the fsImage file
    */
   public File getFsImageName() throws IOException {
-    return getFSImage().getFsImageName();
+    return getFSImage().getStorage().getFsImageName();
   }
     
   public FSImage getFSImage() {
@@ -1404,7 +1404,7 @@ public class NameNode implements Namenod
    * checkpointing
    */
   public File[] getFsImageNameCheckpoint() throws IOException {
-    return getFSImage().getFsImageNameCheckpoint();
+    return getFSImage().getStorage().getFsImageNameCheckpoint();
   }
 
   /**
@@ -1475,7 +1475,7 @@ public class NameNode implements Namenod
     String clusterId = StartupOption.FORMAT.getClusterId();
     if(clusterId == null || clusterId.equals("")) {
       // try to get one from the existing storage
-      clusterId = fsImage.determineClusterId();
+      clusterId = fsImage.getStorage().determineClusterId();
       if (clusterId == null || clusterId.equals("")) {
         throw new IllegalArgumentException("Format must be provided with clusterid");
       }
@@ -1487,7 +1487,7 @@ public class NameNode implements Namenod
         while(System.in.read() != '\n'); // discard the enter-key
       }
     }
-    nsys.dir.fsImage.format(clusterId);
+    nsys.dir.fsImage.getStorage().format(clusterId);
     return false;
   }
 
@@ -1622,7 +1622,7 @@ public class NameNode implements Namenod
         return null; // avoid javac warning
       case GENCLUSTERID:
         System.err.println("Generating new cluster id:");
-        System.out.println(FSImage.newClusterID());
+        System.out.println(NNStorage.newClusterID());
         System.exit(0);
         return null;
       case FINALIZE:

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue May 24 09:03:13 2011
@@ -176,8 +176,8 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       FSNamesystem fsn = nn.getNamesystem();
       FSImage fsImage = fsn.getFSImage();
-      List<Storage.StorageDirectory> removedStorageDirs = fsImage
-          .getRemovedStorageDirs();
+      List<Storage.StorageDirectory> removedStorageDirs 
+        = fsImage.getStorage().getRemovedStorageDirs();
 
       // FS Image storage configuration
       out.print("<h3> " + nn.getRole() + " Storage: </h3>");
@@ -185,7 +185,8 @@ class NamenodeJspHelper {
               + "<thead><tr><td><b>Storage Directory</b></td><td><b>Type</b></td><td><b>State</b></td></tr></thead>");
 
       StorageDirectory st = null;
-      for (Iterator<StorageDirectory> it = fsImage.dirIterator(); it.hasNext();) {
+      for (Iterator<StorageDirectory> it 
+             = fsImage.getStorage().dirIterator(); it.hasNext();) {
         st = it.next();
         String dir = "" + st.getRoot();
         String type = "" + st.getStorageDirType();

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue May 24 09:03:13 2011
@@ -41,8 +41,10 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
@@ -317,7 +319,7 @@ public class SecondaryNameNode implement
         LOG.error("Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
-        checkpointImage.imageDigest = null;
+        checkpointImage.getStorage().imageDigest = null;
       } catch (Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
@@ -341,32 +343,34 @@ public class SecondaryNameNode implement
   
           @Override
           public Boolean run() throws Exception {
-            checkpointImage.cTime = sig.cTime;
-            checkpointImage.checkpointTime = sig.checkpointTime;
-                    
+            checkpointImage.getStorage().cTime = sig.cTime;
+            checkpointImage.getStorage().setCheckpointTime(sig.checkpointTime);
+
             // get fsimage
             String fileid;
             Collection<File> list;
             File[] srcNames;
             boolean downloadImage = true;
-            if (sig.imageDigest.equals(checkpointImage.imageDigest)) {
+            if (sig.imageDigest.equals(
+                    checkpointImage.getStorage().imageDigest)) {
               downloadImage = false;
               LOG.info("Image has not changed. Will not download image.");
             } else {
               fileid = "getimage=1";
-              list = checkpointImage.getFiles(NameNodeFile.IMAGE,
-                  NameNodeDirType.IMAGE);
+              list = checkpointImage.getStorage().getFiles(
+                  NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
               srcNames = list.toArray(new File[list.size()]);
               assert srcNames.length > 0 : "No checkpoint targets.";
               TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-              checkpointImage.imageDigest = sig.imageDigest;
+              checkpointImage.getStorage().imageDigest = sig.imageDigest;
               LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
                   srcNames[0].length() + " bytes.");
             }
         
             // get edits file
             fileid = "getedit=1";
-            list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+            list = getFSImage().getStorage().getFiles(
+                NameNodeFile.EDITS, NameNodeDirType.EDITS);
             srcNames = list.toArray(new File[list.size()]);;
             assert srcNames.length > 0 : "No checkpoint targets.";
             TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
@@ -394,7 +398,7 @@ public class SecondaryNameNode implement
     String fileid = "putimage=1&port=" + imagePort +
       "&machine=" + infoBindAddress + 
       "&token=" + sig.toString() +
-      "&newChecksum=" + checkpointImage.imageDigest;
+      "&newChecksum=" + checkpointImage.getStorage().getImageDigest();
     LOG.info("Posted URL " + fsName + fileid);
     TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
   }
@@ -413,7 +417,7 @@ public class SecondaryNameNode implement
     if (sockAddr.getAddress().isAnyLocalAddress()) {
       if(UserGroupInformation.isSecurityEnabled()) {
         throw new IOException("Cannot use a wildcard address with security. " +
-        		"Must explicitly set bind address for Kerberos");
+                              "Must explicitly set bind address for Kerberos");
       }
       return fsName.getHost() + ":" + sockAddr.getPort();
     } else {
@@ -462,13 +466,13 @@ public class SecondaryNameNode implement
     checkpointImage.endCheckpoint();
 
     LOG.warn("Checkpoint done. New Image Size: " 
-              + checkpointImage.getFsImageName().length());
+             + checkpointImage.getStorage().getFsImageName().length());
     
     return loadImage;
   }
 
   private void startCheckpoint() throws IOException {
-    checkpointImage.unlockAll();
+    checkpointImage.getStorage().unlockAll();
     checkpointImage.getEditLog().close();
     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
     checkpointImage.startCheckpoint();
@@ -626,10 +630,10 @@ public class SecondaryNameNode implement
                        Collection<URI> editsDirs) throws IOException {
       Collection<URI> tempDataDirs = new ArrayList<URI>(dataDirs);
       Collection<URI> tempEditsDirs = new ArrayList<URI>(editsDirs);
-      this.storageDirs = new ArrayList<StorageDirectory>();
-      setStorageDirectories(tempDataDirs, tempEditsDirs);
+      storage.close();
+      storage.setStorageDirectories(tempDataDirs, tempEditsDirs);
       for (Iterator<StorageDirectory> it = 
-                   dirIterator(); it.hasNext();) {
+                   storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
         boolean isAccessible = true;
         try { // create directories if don't exist yet
@@ -673,14 +677,18 @@ public class SecondaryNameNode implement
      * @throws IOException
      */
     void startCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveCurrent(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveCurrent(sd);
       }
     }
 
     void endCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveLastCheckpoint(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveLastCheckpoint(sd);
       }
     }
 
@@ -694,25 +702,26 @@ public class SecondaryNameNode implement
       StorageDirectory sdEdits = null;
       Iterator<StorageDirectory> it = null;
       if (loadImage) {
-        it = dirIterator(NameNodeDirType.IMAGE);
+        it = getStorage().dirIterator(NameNodeDirType.IMAGE);
         if (it.hasNext())
           sdName = it.next();
         if (sdName == null) {
           throw new IOException("Could not locate checkpoint fsimage");
         }
       }
-      it = dirIterator(NameNodeDirType.EDITS);
+      it = getStorage().dirIterator(NameNodeDirType.EDITS);
       if (it.hasNext())
         sdEdits = it.next();
       if (sdEdits == null)
         throw new IOException("Could not locate checkpoint edits");
       if (loadImage) {
-        this.layoutVersion = -1; // to avoid assert in loadFSImage()
-        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+        // to avoid assert in loadFSImage()
+        this.getStorage().layoutVersion = -1;
+        loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
       }
       loadFSEdits(sdEdits);
-      clusterID = sig.getClusterID();
-      blockpoolID = sig.getBlockpoolID();
+      storage.setClusterID(sig.getClusterID());
+      storage.setBlockPoolID(sig.getBlockpoolID());
       sig.validateStorageInfo(this);
       saveNamespace(false);
     }

Modified: hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java?rev=1126941&r1=1126940&r2=1126941&view=diff
==============================================================================
--- hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java (original)
+++ hadoop/hdfs/branches/yahoo-merge/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java Tue May 24 09:03:13 2011
@@ -60,7 +60,7 @@ class UpgradeManagerNamenode extends Upg
       initializeUpgrade();
       if(!upgradeState) return false;
       // write new upgrade state into disk
-      namesystem.getFSImage().writeAll();
+      namesystem.getFSImage().getStorage().writeAll();
     }
     assert currentUpgrades != null : "currentUpgrades is null";
     this.broadcastCommand = currentUpgrades.first().startUpgrade();
@@ -111,7 +111,7 @@ class UpgradeManagerNamenode extends Upg
   public synchronized void completeUpgrade() throws IOException {
     // set and write new upgrade state into disk
     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
-    namesystem.getFSImage().writeAll();
+    namesystem.getFSImage().getStorage().writeAll();
     currentUpgrades = null;
     broadcastCommand = null;
     namesystem.leaveSafeMode(false);
@@ -125,7 +125,7 @@ class UpgradeManagerNamenode extends Upg
       isFinalized = fsimage.isUpgradeFinalized();
       if(isFinalized) // upgrade is finalized
         return null;  // nothing to report
-      return new UpgradeStatusReport(fsimage.getLayoutVersion(), 
+      return new UpgradeStatusReport(fsimage.getStorage().getLayoutVersion(),
                                      (short)101, isFinalized);
     }
     UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();



Mime
View raw message