hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1066305 [3/3] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Wed, 02 Feb 2011 02:05:19 GMT
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1066305&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Wed Feb  2 02:05:18 2011
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.Closeable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Properties;
+import java.io.RandomAccessFile;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+
+import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * NNStorage is responsible for management of the StorageDirectories used by
+ * the NameNode.
+ */
+@InterfaceAudience.Private
+public class NNStorage extends Storage implements Closeable {
+  private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
+
+  static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
+
+  //
+  // The filenames used for storing the images
+  //
+  enum NameNodeFile {
+    IMAGE     ("fsimage"),
+    TIME      ("fstime"),
+    EDITS     ("edits"),
+    IMAGE_NEW ("fsimage.ckpt"),
+    EDITS_NEW ("edits.new");
+
+    private String fileName = null;
+    private NameNodeFile(String name) { this.fileName = name; }
+    String getName() { return fileName; }
+  }
+
+  /**
+   * Implementation of StorageDirType specific to namenode storage
+   * A Storage directory could be of type IMAGE which stores only fsimage,
+   * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
+   * stores both fsimage and edits.
+   */
+  static enum NameNodeDirType implements StorageDirType {
+    UNDEFINED,
+    IMAGE,
+    EDITS,
+    IMAGE_AND_EDITS;
+
+    public StorageDirType getStorageDirType() {
+      return this;
+    }
+
+    public boolean isOfType(StorageDirType type) {
+      if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
+        return true;
+      return this == type;
+    }
+  }
+
+  /**
+   * Interface to be implemented by classes which make use of storage
+   * directories. They are  notified when a StorageDirectory is causing errors,
+   * becoming available or being formatted.
+   *
+   * This allows the implementors of the interface take their own specific
+   * action on the StorageDirectory when this occurs.
+   */
+  interface NNStorageListener {
+    /**
+     * An error has occurred with a StorageDirectory.
+     * @param sd The storage directory causing the error.
+     * @throws IOException
+     */
+    void errorOccurred(StorageDirectory sd) throws IOException;
+
+    /**
+     * A storage directory has been formatted.
+     * @param sd The storage directory being formatted.
+     * @throws IOException
+     */
+    void formatOccurred(StorageDirectory sd) throws IOException;
+
+    /**
+     * A storage directory is now available use.
+     * @param sd The storage directory which has become available.
+     * @throws IOException
+     */
+    void directoryAvailable(StorageDirectory sd) throws IOException;
+  }
+
+  final private List<NNStorageListener> listeners;
+  private UpgradeManager upgradeManager = null;
+  protected MD5Hash imageDigest = null;
+
+  /**
+   * flag that controls if we try to restore failed storages
+   */
+  private boolean restoreFailedStorage = false;
+  private Object restorationLock = new Object();
+  private boolean disablePreUpgradableLayoutCheck = false;
+
+  private long checkpointTime = -1L;  // The age of the image
+
+  /**
+   * list of failed (and thus removed) storages
+   */
+  final protected List<StorageDirectory> removedStorageDirs
+    = new CopyOnWriteArrayList<StorageDirectory>();
+
+  /**
+   * Construct the NNStorage.
+   * @param conf Namenode configuration.
+   */
+  public NNStorage(Configuration conf) {
+    super(NodeType.NAME_NODE);
+
+    storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
+    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
+  }
+
+  @Override // Storage
+  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
+    if (disablePreUpgradableLayoutCheck) {
+      return false;
+    }
+
+    File oldImageDir = new File(sd.getRoot(), "image");
+    if (!oldImageDir.exists()) {
+      return false;
+    }
+    // check the layout version inside the image file
+    File oldF = new File(oldImageDir, "fsimage");
+    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+    try {
+      oldFile.seek(0);
+      int odlVersion = oldFile.readInt();
+      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+        return false;
+    } finally {
+      oldFile.close();
+    }
+    return true;
+  }
+
+  @Override // Closeable
+  public void close() throws IOException {
+    listeners.clear();
+    unlockAll();
+    storageDirs.clear();
+  }
+
+  /**
+   * Set flag whether an attempt should be made to restore failed storage
+   * directories at the next available oppurtuinity.
+   *
+   * @param val Whether restoration attempt should be made.
+   */
+  void setRestoreFailedStorage(boolean val) {
+    LOG.warn("set restore failed storage to " + val);
+    restoreFailedStorage=val;
+  }
+
+  /**
+   * @return Whether failed storage directories are to be restored.
+   */
+  boolean getRestoreFailedStorage() {
+    return restoreFailedStorage;
+  }
+
+  /**
+   * See if any of removed storages is "writable" again, and can be returned
+   * into service. If saveNamespace is set, then this method is being
+   * called from saveNamespace.
+   *
+   * @param saveNamespace Whether method is being called from saveNamespace()
+   */
+  void attemptRestoreRemovedStorage(boolean saveNamespace) {
+    // if directory is "alive" - copy the images there...
+    if(!restoreFailedStorage || removedStorageDirs.size() == 0)
+      return; //nothing to restore
+
+    /* We don't want more than one thread trying to restore at a time */
+    synchronized (this.restorationLock) {
+      LOG.info("NNStorage.attemptRestoreRemovedStorage: check removed(failed) "+
+               "storarge. removedStorages size = " + removedStorageDirs.size());
+      for(Iterator<StorageDirectory> it
+            = this.removedStorageDirs.iterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        File root = sd.getRoot();
+        LOG.info("currently disabled dir " + root.getAbsolutePath() +
+                 "; type="+sd.getStorageDirType() 
+                 + ";canwrite="+root.canWrite());
+        try {
+          
+          if(root.exists() && root.canWrite()) {
+            /** If this call is being made from savenamespace command, then no
+             * need to format, the savenamespace command will format and write
+             * the new image to this directory anyways.
+             */
+            if (saveNamespace) {
+              sd.clearDirectory();
+            } else {
+              format(sd);
+            }
+            
+            LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+            for (NNStorageListener listener : listeners) {
+              listener.directoryAvailable(sd);
+            }
+            
+            this.addStorageDir(sd); // restore
+            this.removedStorageDirs.remove(sd);
+          }
+        } catch(IOException e) {
+          LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * @return A list of storage directories which are in the errored state.
+   */
+  List<StorageDirectory> getRemovedStorageDirs() {
+    return this.removedStorageDirs;
+  }
+
+  /**
+   * Set the storage directories which will be used. NNStorage.close() should
+   * be called before this to ensure any previous storage directories have been
+   * freed.
+   *
+   * Synchronized due to initialization of storageDirs and removedStorageDirs.
+   *
+   * @param fsNameDirs Locations to store images.
+   * @param fsEditsDirs Locations to store edit logs.
+   * @throws IOException
+   */
+  synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
+                                          Collection<URI> fsEditsDirs)
+      throws IOException {
+    this.storageDirs.clear();
+    this.removedStorageDirs.clear();
+
+   // Add all name dirs with appropriate NameNodeDirType
+    for (URI dirName : fsNameDirs) {
+      checkSchemeConsistency(dirName);
+      boolean isAlsoEdits = false;
+      for (URI editsDirName : fsEditsDirs) {
+        if (editsDirName.compareTo(dirName) == 0) {
+          isAlsoEdits = true;
+          fsEditsDirs.remove(editsDirName);
+          break;
+        }
+      }
+      NameNodeDirType dirType = (isAlsoEdits) ?
+                          NameNodeDirType.IMAGE_AND_EDITS :
+                          NameNodeDirType.IMAGE;
+      // Add to the list of storage directories, only if the
+      // URI is of type file://
+      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
+          == 0){
+        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
+            dirType));
+      }
+    }
+
+    // Add edits dirs if they are different from name dirs
+    for (URI dirName : fsEditsDirs) {
+      checkSchemeConsistency(dirName);
+      // Add to the list of storage directories, only if the
+      // URI is of type file://
+      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
+          == 0)
+        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
+                    NameNodeDirType.EDITS));
+    }
+  }
+
+  /**
+   * Checks the consistency of a URI, in particular if the scheme
+   * is specified and is supported by a concrete implementation
+   * @param u URI whose consistency is being checked.
+   */
+  private static void checkSchemeConsistency(URI u) throws IOException {
+    String scheme = u.getScheme();
+    // the URI should have a proper scheme
+    if(scheme == null)
+      throw new IOException("Undefined scheme for " + u);
+    else {
+      try {
+        // the scheme should be enumerated as JournalType
+        JournalType.valueOf(scheme.toUpperCase());
+      } catch (IllegalArgumentException iae){
+        throw new IOException("Unknown scheme " + scheme +
+            ". It should correspond to a JournalType enumeration value");
+      }
+    }
+  }
+
+  /**
+   * Retrieve current directories of type IMAGE
+   * @return Collection of URI representing image directories
+   * @throws IOException in case of URI processing error
+   */
+  Collection<URI> getImageDirectories() throws IOException {
+    return getDirectories(NameNodeDirType.IMAGE);
+  }
+
+  /**
+   * Retrieve current directories of type EDITS
+   * @return Collection of URI representing edits directories
+   * @throws IOException in case of URI processing error
+   */
+  Collection<URI> getEditsDirectories() throws IOException {
+    return getDirectories(NameNodeDirType.EDITS);
+  }
+
+  /**
+   * Return number of storage directories of the given type.
+   * @param dirType directory type
+   * @return number of storage directories of type dirType
+   */
+  int getNumStorageDirs(NameNodeDirType dirType) {
+    if(dirType == null)
+      return getNumStorageDirs();
+    Iterator<StorageDirectory> it = dirIterator(dirType);
+    int numDirs = 0;
+    for(; it.hasNext(); it.next())
+      numDirs++;
+    return numDirs;
+  }
+
+  /**
+   * Return the list of locations being used for a specific purpose.
+   * i.e. Image or edit log storage.
+   *
+   * @param dirType Purpose of locations requested.
+   * @throws IOException
+   */
+  Collection<URI> getDirectories(NameNodeDirType dirType)
+      throws IOException {
+    ArrayList<URI> list = new ArrayList<URI>();
+    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
+                                    dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      try {
+        list.add(Util.fileAsURI(sd.getRoot()));
+      } catch (IOException e) {
+        throw new IOException("Exception while processing " +
+            "StorageDirectory " + sd.getRoot(), e);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Determine the checkpoint time of the specified StorageDirectory
+   *
+   * @param sd StorageDirectory to check
+   * @return If file exists and can be read, last checkpoint time. If not, 0L.
+   * @throws IOException On errors processing file pointed to by sd
+   */
+  long readCheckpointTime(StorageDirectory sd) throws IOException {
+    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
+    long timeStamp = 0L;
+    if (timeFile.exists() && timeFile.canRead()) {
+      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+      try {
+        timeStamp = in.readLong();
+      } finally {
+        in.close();
+      }
+    }
+    return timeStamp;
+  }
+
+  /**
+   * Write last checkpoint time into a separate file.
+   *
+   * @param sd
+   * @throws IOException
+   */
+  public void writeCheckpointTime(StorageDirectory sd) throws IOException {
+    if (checkpointTime < 0L)
+      return; // do not write negative time
+    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
+    if (timeFile.exists() && ! timeFile.delete()) {
+        LOG.error("Cannot delete chekpoint time file: "
+                  + timeFile.getCanonicalPath());
+    }
+    FileOutputStream fos = new FileOutputStream(timeFile);
+    DataOutputStream out = new DataOutputStream(fos);
+    try {
+      out.writeLong(checkpointTime);
+      out.flush();
+      fos.getChannel().force(true);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Record new checkpoint time in order to
+   * distinguish healthy directories from the removed ones.
+   * If there is an error writing new checkpoint time, the corresponding
+   * storage directory is removed from the list.
+   */
+  public void incrementCheckpointTime() {
+    setCheckpointTimeInStorage(checkpointTime + 1);
+  }
+
+  /**
+   * The age of the namespace state.<p>
+   * Reflects the latest time the image was saved.
+   * Modified with every save or a checkpoint.
+   * Persisted in VERSION file.
+   *
+   * @return the current checkpoint time.
+   */
+  public long getCheckpointTime() {
+    return checkpointTime;
+  }
+
+  /**
+   * Set the checkpoint time.
+   *
+   * This method does not persist the checkpoint time to storage immediately.
+   * 
+   * @see #setCheckpointTimeInStorage
+   * @param newCpT the new checkpoint time.
+   */
+  public void setCheckpointTime(long newCpT) {
+    checkpointTime = newCpT;
+  }
+
+  /**
+   * Set the current checkpoint time. Writes the new checkpoint
+   * time to all available storage directories.
+   * @param newCpT The new checkpoint time.
+   */
+  public void setCheckpointTimeInStorage(long newCpT) {
+    checkpointTime = newCpT;
+    // Write new checkpoint time in all storage directories
+    for(Iterator<StorageDirectory> it =
+                          dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      try {
+        writeCheckpointTime(sd);
+      } catch(IOException e) {
+        // Close any edits stream associated with this dir and remove directory
+        LOG.warn("incrementCheckpointTime failed on "
+                 + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
+      }
+    }
+  }
+
+  /**
+   * Return the name of the image file that is uploaded by periodic
+   * checkpointing
+   *
+   * @return List of filenames to save checkpoints to.
+   */
+  public File[] getFsImageNameCheckpoint() {
+    ArrayList<File> list = new ArrayList<File>();
+    for (Iterator<StorageDirectory> it =
+                 dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW));
+    }
+    return list.toArray(new File[list.size()]);
+  }
+
+  /**
+   * Return the name of the image file.
+   * @return The name of the first image file.
+   */
+  public File getFsImageName() {
+    StorageDirectory sd = null;
+    for (Iterator<StorageDirectory> it =
+      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      sd = it.next();
+      if(sd.getRoot().canRead())
+        return getStorageFile(sd, NameNodeFile.IMAGE);
+    }
+    return null;
+  }
+
+  /**
+   * @return The name of the first editlog file.
+   */
+  public File getFsEditName() throws IOException {
+    for (Iterator<StorageDirectory> it
+           = dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if(sd.getRoot().canRead())
+        return getEditFile(sd);
+    }
+    return null;
+  }
+
+  /**
+   * @return The name of the first time file.
+   */
+  public File getFsTimeName() {
+    StorageDirectory sd = null;
+    // NameNodeFile.TIME shoul be same on all directories
+    for (Iterator<StorageDirectory> it =
+             dirIterator(); it.hasNext();)
+      sd = it.next();
+    return getStorageFile(sd, NameNodeFile.TIME);
+  }
+
+  /** Create new dfs name directory.  Caution: this destroys all files
+   * in this filesystem. */
+  private void format(StorageDirectory sd) throws IOException {
+    sd.clearDirectory(); // create currrent dir
+
+    for (NNStorageListener listener : listeners) {
+      listener.formatOccurred(sd);
+    }
+    sd.write();
+
+    LOG.info("Storage directory " + sd.getRoot()
+             + " has been successfully formatted.");
+  }
+
+  /**
+   * Format all available storage directories.
+   */
+  public void format() throws IOException {
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.namespaceID = newNamespaceID();
+    this.cTime = 0L;
+    this.setCheckpointTime(now());
+    for (Iterator<StorageDirectory> it =
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      format(sd);
+    }
+  }
+
+  /**
+   * Generate new namespaceID.
+   *
+   * namespaceID is a persistent attribute of the namespace.
+   * It is generated when the namenode is formatted and remains the same
+   * during the life cycle of the namenode.
+   * When a datanodes register they receive it as the registrationID,
+   * which is checked every time the datanode is communicating with the
+   * namenode. Datanodes that do not 'know' the namespaceID are rejected.
+   *
+   * @return new namespaceID
+   */
+  private int newNamespaceID() {
+    Random r = new Random();
+    r.setSeed(now());
+    int newID = 0;
+    while(newID == 0)
+      newID = r.nextInt(0x7FFFFFFF);  // use 31 bits only
+    return newID;
+  }
+
+  /**
+   * Move {@code current} to {@code lastcheckpoint.tmp} and
+   * recreate empty {@code current}.
+   * {@code current} is moved only if it is well formatted,
+   * that is contains VERSION file.
+   *
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
+   */
+  protected void moveCurrent(StorageDirectory sd)
+    throws IOException {
+    File curDir = sd.getCurrentDir();
+    File tmpCkptDir = sd.getLastCheckpointTmp();
+    // mv current -> lastcheckpoint.tmp
+    // only if current is formatted - has VERSION file
+    if(sd.getVersionFile().exists()) {
+      assert curDir.exists() : curDir + " directory must exist.";
+      assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
+      rename(curDir, tmpCkptDir);
+    }
+    // recreate current
+    if(!curDir.exists() && !curDir.mkdir())
+      throw new IOException("Cannot create directory " + curDir);
+  }
+
+  /**
+   * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
+   *
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
+   */
+  protected void moveLastCheckpoint(StorageDirectory sd)
+    throws IOException {
+    File tmpCkptDir = sd.getLastCheckpointTmp();
+    File prevCkptDir = sd.getPreviousCheckpoint();
+    // remove previous.checkpoint
+    if (prevCkptDir.exists())
+      deleteDir(prevCkptDir);
+    // mv lastcheckpoint.tmp -> previous.checkpoint
+    if(tmpCkptDir.exists())
+      rename(tmpCkptDir, prevCkptDir);
+  }
+
+  @Override // Storage
+  protected void getFields(Properties props,
+                           StorageDirectory sd
+                           ) throws IOException {
+    super.getFields(props, sd);
+    if (layoutVersion == 0)
+      throw new IOException("NameNode directory "
+                            + sd.getRoot() + " is not formatted.");
+    String sDUS, sDUV;
+    sDUS = props.getProperty("distributedUpgradeState");
+    sDUV = props.getProperty("distributedUpgradeVersion");
+    setDistributedUpgradeState(
+        sDUS == null? false : Boolean.parseBoolean(sDUS),
+        sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
+
+    String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
+    if (layoutVersion <= -26) {
+      if (sMd5 == null) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "file " + STORAGE_FILE_VERSION
+            + " does not have MD5 image digest.");
+      }
+      this.imageDigest = new MD5Hash(sMd5);
+    } else if (sMd5 != null) {
+      throw new InconsistentFSStateException(sd.getRoot(),
+          "file " + STORAGE_FILE_VERSION +
+          " has image MD5 digest when version is " + layoutVersion);
+    }
+
+    this.setCheckpointTime(readCheckpointTime(sd));
+  }
+
+  /**
+   * Write last checkpoint time and version file into the storage directory.
+   *
+   * The version file should always be written last.
+   * Missing or corrupted version file indicates that
+   * the checkpoint is not valid.
+   *
+   * @param sd storage directory
+   * @throws IOException
+   */
+  @Override // Storage
+  protected void setFields(Properties props,
+                           StorageDirectory sd
+                           ) throws IOException {
+    super.setFields(props, sd);
+    boolean uState = getDistributedUpgradeState();
+    int uVersion = getDistributedUpgradeVersion();
+    if(uState && uVersion != getLayoutVersion()) {
+      props.setProperty("distributedUpgradeState", Boolean.toString(uState));
+      props.setProperty("distributedUpgradeVersion",
+                        Integer.toString(uVersion));
+    }
+    if (imageDigest == null) {
+      imageDigest = MD5Hash.digest(
+          new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
+    }
+
+    props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
+
+    writeCheckpointTime(sd);
+  }
+
+  /**
+   * @return A File of 'type' in storage directory 'sd'.
+   */
+  static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
+    return new File(sd.getCurrentDir(), type.getName());
+  }
+
+  /**
+   * @return A editlog File in storage directory 'sd'.
+   */
+  File getEditFile(StorageDirectory sd) {
+    return getStorageFile(sd, NameNodeFile.EDITS);
+  }
+
+  /**
+   * @return A temporary editlog File in storage directory 'sd'.
+   */
+  File getEditNewFile(StorageDirectory sd) {
+    return getStorageFile(sd, NameNodeFile.EDITS_NEW);
+  }
+
+  /**
+   * @return A list of all Files of 'type' in available storage directories.
+   */
+  Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
+    ArrayList<File> list = new ArrayList<File>();
+    Iterator<StorageDirectory> it =
+      (dirType == null) ? dirIterator() : dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      list.add(getStorageFile(it.next(), type));
+    }
+    return list;
+  }
+
+  /**
+   * Set the upgrade manager for use in a distributed upgrade.
+   * @param um The upgrade manager
+   */
+  void setUpgradeManager(UpgradeManager um) {
+    upgradeManager = um;
+  }
+
+  /**
+   * @return The current distribued upgrade state.
+   */
+  boolean getDistributedUpgradeState() {
+    return upgradeManager == null ? false : upgradeManager.getUpgradeState();
+  }
+
+  /**
+   * @return The current upgrade version.
+   */
+  int getDistributedUpgradeVersion() {
+    return upgradeManager == null ? 0 : upgradeManager.getUpgradeVersion();
+  }
+
+  /**
+   * Set the upgrade state and version.
+   * @param uState the new state.
+   * @param uVersion the new version.
+   */
+  private void setDistributedUpgradeState(boolean uState, int uVersion) {
+    upgradeManager.setUpgradeState(uState, uVersion);
+  }
+
+  /**
+   * Verify that the distributed upgrade state is valid.
+   * @param startOpt the option the namenode was started with.
+   */
+  void verifyDistributedUpgradeProgress(StartupOption startOpt
+                                        ) throws IOException {
+    if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
+      return;
+
+    assert upgradeManager != null : "FSNameSystem.upgradeManager is null.";
+    if(startOpt != StartupOption.UPGRADE) {
+      if(upgradeManager.getUpgradeState())
+        throw new IOException(
+                    "\n   Previous distributed upgrade was not completed. "
+                  + "\n   Please restart NameNode with -upgrade option.");
+      if(upgradeManager.getDistributedUpgrades() != null)
+        throw new IOException("\n   Distributed upgrade for NameNode version "
+                              + upgradeManager.getUpgradeVersion()
+                              + " to current LV " + FSConstants.LAYOUT_VERSION
+                              + " is required.\n   Please restart NameNode"
+                              + " with -upgrade option.");
+    }
+  }
+
+  /**
+   * Initialize a distributed upgrade.
+   */
+  void initializeDistributedUpgrade() throws IOException {
+    if(! upgradeManager.initializeUpgrade())
+      return;
+    // write new upgrade state into disk
+    writeAll();
+    LOG.info("\n   Distributed upgrade for NameNode version "
+             + upgradeManager.getUpgradeVersion() + " to current LV "
+             + FSConstants.LAYOUT_VERSION + " is initialized.");
+  }
+
+  /**
+   * Set the digest for the latest image stored by NNStorage.
+   * @param digest The digest for the image.
+   */
+  void setImageDigest(MD5Hash digest) {
+    this.imageDigest = digest;
+  }
+
+  /**
+   * Get the digest for the latest image storage by NNStorage.
+   * @return The digest for the latest image.
+   */
+  MD5Hash getImageDigest() {
+    return imageDigest;
+  }
+
+  /**
+   * Register a listener. The listener will be notified of changes to the list
+   * of available storage directories.
+   *
+   * @see NNStorageListener
+   * @param sel A storage listener.
+   */
+  void registerListener(NNStorageListener sel) {
+    listeners.add(sel);
+  }
+
+  /**
+   * Disable the check for pre-upgradable layouts. Needed for BackupImage.
+   * @param val Whether to disable the preupgradeable layout check.
+   */
+  void setDisablePreUpgradableLayoutCheck(boolean val) {
+    disablePreUpgradableLayoutCheck = val;
+  }
+
+  /**
+   * Marks a list of directories as having experienced an error.
+   *
+   * @param sds A list of storage directories to mark as errored.
+   * @throws IOException
+   */
+  void reportErrorsOnDirectories(List<StorageDirectory> sds) throws IOException {
+    for (StorageDirectory sd : sds) {
+      reportErrorsOnDirectory(sd);
+    }
+  }
+
+  /**
+   * Reports that a directory has experienced an error.
+   * Notifies listeners that the directory is no longer
+   * available.
+   *
+   * @param sd A storage directory to mark as errored.
+   * @throws IOException
+   */
+  void reportErrorsOnDirectory(StorageDirectory sd)
+      throws IOException {
+    LOG.warn("Error reported on storage directory " + sd);
+
+    String lsd = listStorageDirectories();
+    LOG.debug("current list of storage dirs:" + lsd);
+
+    for (NNStorageListener listener : listeners) {
+      listener.errorOccurred(sd);
+    }
+
+    LOG.info("About to remove corresponding storage: "
+             + sd.getRoot().getAbsolutePath());
+    try {
+      sd.unlock();
+    } catch (Exception e) {
+      LOG.info("Unable to unlock bad storage directory: "
+               +  sd.getRoot().getPath(), e);
+    }
+
+    if (this.storageDirs.remove(sd)) {
+      this.removedStorageDirs.add(sd);
+    }
+    incrementCheckpointTime();
+
+    lsd = listStorageDirectories();
+    LOG.debug("at the end current list of storage dirs:" + lsd);
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Feb  2 02:05:18 2011
@@ -346,7 +346,7 @@ public class NameNode implements Namenod
     nodeRegistration = new NamenodeRegistration(
         getHostPortString(rpcAddress),
         getHostPortString(httpAddress),
-        getFSImage(), getRole(), getFSImage().getCheckpointTime());
+        getFSImage().getStorage(), getRole(), getFSImage().getStorage().getCheckpointTime());
     return nodeRegistration;
   }
 
@@ -1336,7 +1336,7 @@ public class NameNode implements Namenod
    * Returns the name of the fsImage file
    */
   public File getFsImageName() throws IOException {
-    return getFSImage().getFsImageName();
+    return getFSImage().getStorage().getFsImageName();
   }
     
   public FSImage getFSImage() {
@@ -1348,7 +1348,7 @@ public class NameNode implements Namenod
    * checkpointing
    */
   public File[] getFsImageNameCheckpoint() throws IOException {
-    return getFSImage().getFsImageNameCheckpoint();
+    return getFSImage().getStorage().getFsImageNameCheckpoint();
   }
 
   /**
@@ -1414,7 +1414,7 @@ public class NameNode implements Namenod
 
     FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
                                          editDirsToFormat), conf);
-    nsys.dir.fsImage.format();
+    nsys.dir.fsImage.getStorage().format();
     return false;
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Feb  2 02:05:18 2011
@@ -172,8 +172,8 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       FSNamesystem fsn = nn.getNamesystem();
       FSImage fsImage = fsn.getFSImage();
-      List<Storage.StorageDirectory> removedStorageDirs = fsImage
-          .getRemovedStorageDirs();
+      List<Storage.StorageDirectory> removedStorageDirs 
+        = fsImage.getStorage().getRemovedStorageDirs();
 
       // FS Image storage configuration
       out.print("<h3> " + nn.getRole() + " Storage: </h3>");
@@ -181,7 +181,8 @@ class NamenodeJspHelper {
               + "<thead><tr><td><b>Storage Directory</b></td><td><b>Type</b></td><td><b>State</b></td></tr></thead>");
 
       StorageDirectory st = null;
-      for (Iterator<StorageDirectory> it = fsImage.dirIterator(); it.hasNext();) {
+      for (Iterator<StorageDirectory> it 
+             = fsImage.getStorage().dirIterator(); it.hasNext();) {
         st = it.next();
         String dir = "" + st.getRoot();
         String type = "" + st.getStorageDirType();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Wed Feb  2 02:05:18 2011
@@ -38,11 +38,12 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
@@ -312,7 +313,7 @@ public class SecondaryNameNode implement
         LOG.error("Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
-        checkpointImage.imageDigest = null;
+        checkpointImage.getStorage().imageDigest = null;
       } catch (Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
@@ -336,32 +337,34 @@ public class SecondaryNameNode implement
   
           @Override
           public Boolean run() throws Exception {
-            checkpointImage.cTime = sig.cTime;
-            checkpointImage.checkpointTime = sig.checkpointTime;
-                    
+            checkpointImage.getStorage().cTime = sig.cTime;
+            checkpointImage.getStorage().setCheckpointTime(sig.checkpointTime);
+
             // get fsimage
             String fileid;
             Collection<File> list;
             File[] srcNames;
             boolean downloadImage = true;
-            if (sig.imageDigest.equals(checkpointImage.imageDigest)) {
+            if (sig.imageDigest.equals(
+                    checkpointImage.getStorage().imageDigest)) {
               downloadImage = false;
               LOG.info("Image has not changed. Will not download image.");
             } else {
               fileid = "getimage=1";
-              list = checkpointImage.getFiles(NameNodeFile.IMAGE,
-                  NameNodeDirType.IMAGE);
+              list = checkpointImage.getStorage().getFiles(
+                  NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
               srcNames = list.toArray(new File[list.size()]);
               assert srcNames.length > 0 : "No checkpoint targets.";
               TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-              checkpointImage.imageDigest = sig.imageDigest;
+              checkpointImage.getStorage().imageDigest = sig.imageDigest;
               LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
                   srcNames[0].length() + " bytes.");
             }
         
             // get edits file
             fileid = "getedit=1";
-            list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+            list = getFSImage().getStorage().getFiles(
+                NameNodeFile.EDITS, NameNodeDirType.EDITS);
             srcNames = list.toArray(new File[list.size()]);;
             assert srcNames.length > 0 : "No checkpoint targets.";
             TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
@@ -385,7 +388,7 @@ public class SecondaryNameNode implement
     String fileid = "putimage=1&port=" + imagePort +
       "&machine=" + infoBindAddress + 
       "&token=" + sig.toString() +
-      "&newChecksum=" + checkpointImage.imageDigest;
+      "&newChecksum=" + checkpointImage.getStorage().getImageDigest();
     LOG.info("Posted URL " + fsName + fileid);
     TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
   }
@@ -404,7 +407,7 @@ public class SecondaryNameNode implement
     if (sockAddr.getAddress().isAnyLocalAddress()) {
       if(UserGroupInformation.isSecurityEnabled()) {
         throw new IOException("Cannot use a wildcard address with security. " +
-        		"Must explicitly set bind address for Kerberos");
+                              "Must explicitly set bind address for Kerberos");
       }
       return fsName.getHost() + ":" + sockAddr.getPort();
     } else {
@@ -453,13 +456,13 @@ public class SecondaryNameNode implement
     checkpointImage.endCheckpoint();
 
     LOG.warn("Checkpoint done. New Image Size: " 
-              + checkpointImage.getFsImageName().length());
+             + checkpointImage.getStorage().getFsImageName().length());
     
     return loadImage;
   }
 
   private void startCheckpoint() throws IOException {
-    checkpointImage.unlockAll();
+    checkpointImage.getStorage().unlockAll();
     checkpointImage.getEditLog().close();
     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
     checkpointImage.startCheckpoint();
@@ -617,10 +620,10 @@ public class SecondaryNameNode implement
                        Collection<URI> editsDirs) throws IOException {
       Collection<URI> tempDataDirs = new ArrayList<URI>(dataDirs);
       Collection<URI> tempEditsDirs = new ArrayList<URI>(editsDirs);
-      this.storageDirs = new ArrayList<StorageDirectory>();
-      setStorageDirectories(tempDataDirs, tempEditsDirs);
+      storage.close();
+      storage.setStorageDirectories(tempDataDirs, tempEditsDirs);
       for (Iterator<StorageDirectory> it = 
-                   dirIterator(); it.hasNext();) {
+                   storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
         boolean isAccessible = true;
         try { // create directories if don't exist yet
@@ -664,14 +667,18 @@ public class SecondaryNameNode implement
      * @throws IOException
      */
     void startCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveCurrent(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveCurrent(sd);
       }
     }
 
     void endCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveLastCheckpoint(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveLastCheckpoint(sd);
       }
     }
 
@@ -685,21 +692,22 @@ public class SecondaryNameNode implement
       StorageDirectory sdEdits = null;
       Iterator<StorageDirectory> it = null;
       if (loadImage) {
-        it = dirIterator(NameNodeDirType.IMAGE);
+        it = getStorage().dirIterator(NameNodeDirType.IMAGE);
         if (it.hasNext())
           sdName = it.next();
         if (sdName == null) {
           throw new IOException("Could not locate checkpoint fsimage");
         }
       }
-      it = dirIterator(NameNodeDirType.EDITS);
+      it = getStorage().dirIterator(NameNodeDirType.EDITS);
       if (it.hasNext())
         sdEdits = it.next();
       if (sdEdits == null)
         throw new IOException("Could not locate checkpoint edits");
       if (loadImage) {
-        this.layoutVersion = -1; // to avoid assert in loadFSImage()
-        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+        // to avoid assert in loadFSImage()
+        this.getStorage().layoutVersion = -1;
+        loadFSImage(getStorage().getStorageFile(sdName, NameNodeFile.IMAGE));
       }
       loadFSEdits(sdEdits);
       sig.validateStorageInfo(this);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java Wed Feb  2 02:05:18 2011
@@ -60,7 +60,7 @@ class UpgradeManagerNamenode extends Upg
       initializeUpgrade();
       if(!upgradeState) return false;
       // write new upgrade state into disk
-      namesystem.getFSImage().writeAll();
+      namesystem.getFSImage().getStorage().writeAll();
     }
     assert currentUpgrades != null : "currentUpgrades is null";
     this.broadcastCommand = currentUpgrades.first().startUpgrade();
@@ -111,7 +111,7 @@ class UpgradeManagerNamenode extends Upg
   public synchronized void completeUpgrade() throws IOException {
     // set and write new upgrade state into disk
     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
-    namesystem.getFSImage().writeAll();
+    namesystem.getFSImage().getStorage().writeAll();
     currentUpgrades = null;
     broadcastCommand = null;
     namesystem.leaveSafeMode(false);
@@ -125,7 +125,7 @@ class UpgradeManagerNamenode extends Upg
       isFinalized = fsimage.isUpgradeFinalized();
       if(isFinalized) // upgrade is finalized
         return null;  // nothing to report
-      return new UpgradeStatusReport(fsimage.getLayoutVersion(), 
+      return new UpgradeStatusReport(fsimage.getStorage().getLayoutVersion(),
                                      (short)101, isFinalized);
     }
     UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java Wed Feb  2 02:05:18 2011
@@ -192,7 +192,7 @@ public class TestDFSRollback extends Tes
                                                 .build();
       UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
       baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
                                          new StorageInfo(Integer.MIN_VALUE,
                                                          UpgradeUtilities.getCurrentNamespaceID(cluster),
                                                          UpgradeUtilities.getCurrentFsscTime(cluster)));
@@ -212,7 +212,7 @@ public class TestDFSRollback extends Tes
                                                 .build();
       UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
       baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
                                          new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
                                                          UpgradeUtilities.getCurrentNamespaceID(cluster),
                                                          Long.MAX_VALUE));
@@ -251,7 +251,7 @@ public class TestDFSRollback extends Tes
       log("NameNode rollback with old layout version in previous", numDirs);
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
       baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(NAME_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
                                          new StorageInfo(1,
                                                          UpgradeUtilities.getCurrentNamespaceID(null),
                                                          UpgradeUtilities.getCurrentFsscTime(null)));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java Wed Feb  2 02:05:18 2011
@@ -188,7 +188,7 @@ public class TestDFSStartupVersions exte
       File[] storage = UpgradeUtilities.createStorageDirs(
                                                           DATA_NODE, conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
       log("DataNode version info", DATA_NODE, i, versions[i]);
-      UpgradeUtilities.createVersionFile(DATA_NODE, storage, versions[i]);
+      UpgradeUtilities.createVersionFile(conf, DATA_NODE, storage, versions[i]);
       try {
         cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
       } catch (Exception ignore) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Wed Feb  2 02:05:18 2011
@@ -190,7 +190,7 @@ public class TestDFSUpgrade extends Test
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
       cluster = createCluster();
       baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
                                          new StorageInfo(Integer.MIN_VALUE,
                                                          UpgradeUtilities.getCurrentNamespaceID(cluster),
                                                          UpgradeUtilities.getCurrentFsscTime(cluster)));
@@ -203,7 +203,7 @@ public class TestDFSUpgrade extends Test
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
       cluster = createCluster();
       baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
                                          new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
                                                          UpgradeUtilities.getCurrentNamespaceID(cluster),
                                                          Long.MAX_VALUE));
@@ -238,7 +238,7 @@ public class TestDFSUpgrade extends Test
       
       log("NameNode upgrade with old layout version in current", numDirs);
       baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(NAME_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
                                          new StorageInfo(Storage.LAST_UPGRADABLE_LAYOUT_VERSION + 1,
                                                          UpgradeUtilities.getCurrentNamespaceID(null),
                                                          UpgradeUtilities.getCurrentFsscTime(null)));
@@ -247,7 +247,7 @@ public class TestDFSUpgrade extends Test
       
       log("NameNode upgrade with future layout version in current", numDirs);
       baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(NAME_NODE, baseDirs,
+      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
                                          new StorageInfo(Integer.MIN_VALUE,
                                                          UpgradeUtilities.getCurrentNamespaceID(null),
                                                          UpgradeUtilities.getCurrentFsscTime(null)));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java Wed Feb  2 02:05:18 2011
@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 /**
@@ -301,7 +301,7 @@ public class UpgradeUtilities {
    *
    * @return the created version file
    */
-  public static File[] createVersionFile(NodeType nodeType, File[] parent,
+  public static File[] createVersionFile(Configuration conf, NodeType nodeType, File[] parent,
                                          StorageInfo version) throws IOException 
   {
     Storage storage = null;
@@ -311,7 +311,8 @@ public class UpgradeUtilities {
       FileUtil.fullyDelete(versionFile);
       switch (nodeType) {
       case NAME_NODE:
-        storage = new FSImage(version);
+        storage = new NNStorage(conf);
+        storage.setStorageInfo(version);
         break;
       case DATA_NODE:
         storage = new DataStorage(version, "doNotCare");

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java Wed Feb  2 02:05:18 2011
@@ -193,7 +193,7 @@ public class CreateEditsLog {
     FileNameGenerator nameGenerator = new FileNameGenerator(BASE_PATH, 100);
 
     FSEditLog editLog = fsImage.getEditLog();
-    editLog.createEditLogFile(fsImage.getFsEditName());
+    editLog.createEditLogFile(fsImage.getStorage().getFsEditName());
     editLog.open();
     addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
              nameGenerator);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Wed Feb  2 02:05:18 2011
@@ -33,7 +33,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -83,9 +84,10 @@ public class OfflineEditsViewerHelper {
   private String getEditsFilename() throws IOException {
     FSImage image = cluster.getNameNode().getFSImage();
     // it was set up to only have ONE StorageDirectory
-    Iterator<StorageDirectory> it = image.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it
+      = image.getStorage().dirIterator(NameNodeDirType.EDITS);
     StorageDirectory sd = it.next();
-    return image.getEditFile(sd).getAbsolutePath();
+    return image.getStorage().getEditFile(sd).getAbsolutePath();
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Wed Feb  2 02:05:18 2011
@@ -31,11 +31,11 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -186,15 +186,15 @@ public class TestCheckpoint extends Test
     // and that temporary checkpoint files are gone.
     FSImage image = cluster.getNameNode().getFSImage();
     for (Iterator<StorageDirectory> it = 
-             image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+           image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      assertFalse(FSImage.getImageFile(sd, NameNodeFile.IMAGE_NEW).exists());
+      assertFalse(image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE_NEW).exists());
     }
     for (Iterator<StorageDirectory> it = 
-            image.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+           image.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
-      assertFalse(image.getEditNewFile(sd).exists());
-      File edits = image.getEditFile(sd);
+      assertFalse(image.getStorage().getEditNewFile(sd).exists());
+      File edits = image.getStorage().getEditFile(sd);
       assertTrue(edits.exists()); // edits should exist and be empty
       long editsLen = edits.length();
       assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
@@ -362,10 +362,10 @@ public class TestCheckpoint extends Test
       assertTrue(!fileSys.exists(file1));
       StorageDirectory sd = null;
       for (Iterator<StorageDirectory> it = 
-                image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+                image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
          sd = it.next();
       assertTrue(sd != null);
-      long fsimageLength = FSImage.getImageFile(sd, NameNodeFile.IMAGE).length();
+      long fsimageLength = image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE).length();
       //
       // Make the checkpoint
       //
@@ -383,8 +383,8 @@ public class TestCheckpoint extends Test
 
       // Verify that image file sizes did not change.
       for (Iterator<StorageDirectory> it = 
-              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue(FSImage.getImageFile(it.next(), 
+              image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue(image.getStorage().getStorageFile(it.next(), 
                                 NameNodeFile.IMAGE).length() == fsimageLength);
       }
 
@@ -477,7 +477,7 @@ public class TestCheckpoint extends Test
     SecondaryNameNode secondary = null;
     try {
       secondary = startSecondaryNameNode(conf);
-      assertFalse(secondary.getFSImage().isLockSupported(0));
+      assertFalse(secondary.getFSImage().getStorage().isLockSupported(0));
       secondary.shutdown();
     } catch (IOException e) { // expected to fail
       assertTrue(secondary == null);
@@ -502,7 +502,7 @@ public class TestCheckpoint extends Test
     try {
       nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
                           StartupOption.REGULAR);
-      assertFalse(nn.getFSImage().isLockSupported(0));
+      assertFalse(nn.getFSImage().getStorage().isLockSupported(0));
       nn.stop(); nn = null;
     } catch (IOException e) { // expected to fail
       assertTrue(nn == null);
@@ -516,7 +516,7 @@ public class TestCheckpoint extends Test
     SecondaryNameNode secondary2 = null;
     try {
       secondary2 = startSecondaryNameNode(conf);
-      assertFalse(secondary2.getFSImage().isLockSupported(0));
+      assertFalse(secondary2.getFSImage().getStorage().isLockSupported(0));
       secondary2.shutdown();
     } catch (IOException e) { // expected to fail
       assertTrue(secondary2 == null);
@@ -564,8 +564,8 @@ public class TestCheckpoint extends Test
     // Verify that image file sizes did not change.
     FSImage image = nn.getFSImage();
     for (Iterator<StorageDirectory> it = 
-            image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      assertTrue(FSImage.getImageFile(it.next(), 
+            image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      assertTrue(image.getStorage().getStorageFile(it.next(), 
                           NameNodeFile.IMAGE).length() == fsimageLength);
     }
     nn.stop();
@@ -813,15 +813,15 @@ public class TestCheckpoint extends Test
       // Make the checkpoint
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      long fsimageLength = FSImage.getImageFile(
-          image.dirIterator(NameNodeDirType.IMAGE).next(),
-          NameNodeFile.IMAGE).length();
+      long fsimageLength = image.getStorage()
+        .getStorageFile(image.getStorage().dirIterator(NameNodeDirType.IMAGE).next(),
+                        NameNodeFile.IMAGE).length();
       assertFalse("Image is downloaded", secondary.doCheckpoint());
 
       // Verify that image file sizes did not change.
       for (Iterator<StorageDirectory> it = 
-              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size does not change", FSImage.getImageFile(it.next(), 
+             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size does not change", image.getStorage().getStorageFile(it.next(), 
                                 NameNodeFile.IMAGE).length() == fsimageLength);
       }
 
@@ -830,9 +830,10 @@ public class TestCheckpoint extends Test
       assertTrue("Image is not downloaded", secondary.doCheckpoint());
 
       for (Iterator<StorageDirectory> it = 
-        image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size increased", FSImage.getImageFile(it.next(), 
-                          NameNodeFile.IMAGE).length() > fsimageLength);
+             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size increased", 
+                   image.getStorage().getStorageFile(it.next(), 
+                                                     NameNodeFile.IMAGE).length() > fsimageLength);
      }
 
       secondary.shutdown();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Wed Feb  2 02:05:18 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 
 /**
  * This class tests the creation and validation of a checkpoint.
@@ -140,8 +140,8 @@ public class TestEditLog extends TestCas
       //
       FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
-              fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+              fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Wed Feb  2 02:05:18 2011
@@ -38,8 +38,8 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 
 import static org.junit.Assert.*;
 import org.junit.Test;
@@ -218,8 +218,8 @@ public class TestEditLogRace {
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.
     for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+           fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
       System.out.println("Verifying file: " + editFile);
       int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
         new EditLogFileInputStream(editFile));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java Wed Feb  2 02:05:18 2011
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 
 import java.util.Iterator;
 import java.util.List;
@@ -108,11 +110,11 @@ public class TestParallelImageWrite exte
   
   private void checkImages(FSNamesystem fsn) throws Exception {
     Iterator<StorageDirectory> iter = fsn.
-            getFSImage().dirIterator(FSImage.NameNodeDirType.IMAGE);
+            getFSImage().getStorage().dirIterator(NameNodeDirType.IMAGE);
     List<Long> checksums = new ArrayList<Long>();
     while (iter.hasNext()) {
       StorageDirectory sd = iter.next();
-      File fsImage = FSImage.getImageFile(sd, FSImage.NameNodeFile.IMAGE);
+      File fsImage = fsn.getFSImage().getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
       PureJavaCrc32 crc = new PureJavaCrc32();
       FileInputStream in = new FileInputStream(fsImage);
       byte[] buff = new byte[4096];

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java Wed Feb  2 02:05:18 2011
@@ -103,11 +103,17 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     FSImage originalImage = fsn.dir.fsImage;
+    NNStorage storage = originalImage.getStorage();
+    storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
+
+    NNStorage spyStorage = spy(storage);
+    originalImage.storage = spyStorage;
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
+    
+    spyImage.getStorage().setStorageDirectories(FSNamesystem.getNamespaceDirs(conf), 
+                                                FSNamesystem.getNamespaceEditsDirs(conf));
 
     // inject fault
     switch(fault) {
@@ -119,12 +125,12 @@ public class TestSaveNamespace {
     case MOVE_CURRENT:
       // The spy throws a RuntimeException when calling moveCurrent()
       doThrow(new RuntimeException("Injected fault: moveCurrent")).
-        when(spyImage).moveCurrent((StorageDirectory)anyObject());
+        when(spyStorage).moveCurrent((StorageDirectory)anyObject());
       break;
     case MOVE_LAST_CHECKPOINT:
       // The spy throws a RuntimeException when calling moveLastCheckpoint()
       doThrow(new RuntimeException("Injected fault: moveLastCheckpoint")).
-        when(spyImage).moveLastCheckpoint((StorageDirectory)anyObject());
+        when(spyStorage).moveLastCheckpoint((StorageDirectory)anyObject());
       break;
     }
 
@@ -174,12 +180,18 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     FSImage originalImage = fsn.dir.fsImage;
+    NNStorage storage = originalImage.getStorage();
+    storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
+
+    NNStorage spyStorage = spy(storage);
+    originalImage.storage = spyStorage;
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
 
+    spyImage.getStorage().setStorageDirectories(FSNamesystem.getNamespaceDirs(conf), 
+                                                FSNamesystem.getNamespaceEditsDirs(conf));
+
     // inject fault
     // The spy throws a IOException when writing to the second directory
     doAnswer(new FaultySaveImage(false)).
@@ -195,9 +207,9 @@ public class TestSaveNamespace {
       fsn.saveNamespace();
       LOG.warn("First savenamespace sucessful.");
       assertTrue("Savenamespace should have marked one directory as bad." +
-                 " But found " + spyImage.getRemovedStorageDirs().size() +
+                 " But found " + spyStorage.getRemovedStorageDirs().size() +
                  " bad directories.", 
-                   spyImage.getRemovedStorageDirs().size() == 1);
+                   spyStorage.getRemovedStorageDirs().size() == 1);
 
       // The next call to savenamespace should try inserting the
       // erroneous directory back to fs.name.dir. This command should
@@ -207,9 +219,9 @@ public class TestSaveNamespace {
       LOG.warn("Second savenamespace sucessful.");
       assertTrue("Savenamespace should have been successful in removing " +
                  " bad directories from Image."  +
-                 " But found " + originalImage.getRemovedStorageDirs().size() +
+                 " But found " + storage.getRemovedStorageDirs().size() +
                  " bad directories.", 
-                 originalImage.getRemovedStorageDirs().size() == 0);
+                 storage.getRemovedStorageDirs().size() == 0);
 
       // Now shut down and restart the namesystem
       LOG.info("Shutting down fsimage.");
@@ -257,8 +269,10 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     final FSImage originalImage = fsn.dir.fsImage;
+    originalImage.getStorage().close();
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
+    spyImage.getStorage().setStorageDirectories(
         FSNamesystem.getNamespaceDirs(conf), 
         FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Wed Feb  2 02:05:18 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -138,8 +138,8 @@ public class TestSecurityTokenEditLog ex
       namesystem.getDelegationTokenSecretManager().stopThreads();
       int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys();
       for (Iterator<StorageDirectory> it = 
-              fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+             fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java Wed Feb  2 02:05:18 2011
@@ -48,8 +48,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.StringUtils;
@@ -226,15 +226,15 @@ public class TestStartup extends TestCas
    */
   private void verifyDifferentDirs(FSImage img, long expectedImgSize, long expectedEditsSize) {
     StorageDirectory sd =null;
-    for (Iterator<StorageDirectory> it = img.dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = img.getStorage().dirIterator(); it.hasNext();) {
       sd = it.next();
 
       if(sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
+        File imf = img.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
         LOG.info("--image file " + imf.getAbsolutePath() + "; len = " + imf.length() + "; expected = " + expectedImgSize);
         assertEquals(expectedImgSize, imf.length());	
       } else if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+        File edf = img.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
         LOG.info("-- edits file " + edf.getAbsolutePath() + "; len = " + edf.length()  + "; expected = " + expectedEditsSize);
         assertEquals(expectedEditsSize, edf.length());	
       } else {
@@ -337,10 +337,10 @@ public class TestStartup extends TestCas
 
       // now verify that image and edits are created in the different directories
       FSImage image = nn.getFSImage();
-      StorageDirectory sd = image.getStorageDir(0); //only one
+      StorageDirectory sd = image.getStorage().getStorageDir(0); //only one
       assertEquals(sd.getStorageDirType(), NameNodeDirType.IMAGE_AND_EDITS);
-      File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
-      File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+      File imf = image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
+      File edf = image.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
       LOG.info("--image file " + imf.getAbsolutePath() + "; len = " + imf.length());
       LOG.info("--edits file " + edf.getAbsolutePath() + "; len = " + edf.length());
 
@@ -445,7 +445,7 @@ public class TestStartup extends TestCas
     FSImage image = namenode.getFSImage();
     image.loadFSImage();
 
-    File versionFile = image.getStorageDir(0).getVersionFile();
+    File versionFile = image.getStorage().getStorageDir(0).getVersionFile();
 
     RandomAccessFile file = new RandomAccessFile(versionFile, "rws");
     FileInputStream in = null;
@@ -458,12 +458,12 @@ public class TestStartup extends TestCas
       props.load(in);
 
       // get the MD5 property and change it
-      String sMd5 = props.getProperty(FSImage.MESSAGE_DIGEST_PROPERTY);
+      String sMd5 = props.getProperty(NNStorage.MESSAGE_DIGEST_PROPERTY);
       MD5Hash md5 = new MD5Hash(sMd5);
       byte[] bytes = md5.getDigest();
       bytes[0] += 1;
       md5 = new MD5Hash(bytes);
-      props.setProperty(FSImage.MESSAGE_DIGEST_PROPERTY, md5.toString());
+      props.setProperty(NNStorage.MESSAGE_DIGEST_PROPERTY, md5.toString());
 
       // write the properties back to version file
       file.seek(0);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java Wed Feb  2 02:05:18 2011
@@ -45,8 +45,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
@@ -128,7 +128,7 @@ public class TestStorageRestore extends 
    */
   public void invalidateStorage(FSImage fi) throws IOException {
     ArrayList<StorageDirectory> al = new ArrayList<StorageDirectory>(2);
-    Iterator<StorageDirectory> it = fi.dirIterator();
+    Iterator<StorageDirectory> it = fi.getStorage().dirIterator();
     while(it.hasNext()) {
       StorageDirectory sd = it.next();
       if(sd.getRoot().equals(path2) || sd.getRoot().equals(path3)) {
@@ -136,7 +136,7 @@ public class TestStorageRestore extends 
       }
     }
     // simulate an error
-    fi.processIOError(al, true);
+    fi.getStorage().reportErrorsOnDirectories(al);
   }
   
   /**
@@ -144,15 +144,15 @@ public class TestStorageRestore extends 
    */
   public void printStorages(FSImage fs) {
     LOG.info("current storages and corresoponding sizes:");
-    for(Iterator<StorageDirectory> it = fs.dirIterator(); it.hasNext(); ) {
+    for(Iterator<StorageDirectory> it = fs.getStorage().dirIterator(); it.hasNext(); ) {
       StorageDirectory sd = it.next();
       
       if(sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
+        File imf = fs.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
         LOG.info("  image file " + imf.getAbsolutePath() + "; len = " + imf.length());  
       }
       if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+        File edf = fs.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
         LOG.info("  edits file " + edf.getAbsolutePath() + "; len = " + edf.length()); 
       }
     }
@@ -342,7 +342,7 @@ public class TestStorageRestore extends 
       FSImage fsi = cluster.getNameNode().getFSImage();
 
       // it is started with dfs.name.dir.restore set to true (in SetUp())
-      boolean restore = fsi.getRestoreFailedStorage();
+      boolean restore = fsi.getStorage().getRestoreFailedStorage();
       LOG.info("Restore is " + restore);
       assertEquals(restore, true);
 
@@ -355,19 +355,19 @@ public class TestStorageRestore extends 
           new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
           namenode);
       executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertFalse("After set true call restore is " + restore, restore);
 
       // run one more time - to set it to true again
       cmd = "-fs NAMENODE -restoreFailedStorage true";
       executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertTrue("After set false call restore is " + restore, restore);
       
    // run one more time - no change in value
       cmd = "-fs NAMENODE -restoreFailedStorage check";
       CommandExecutor.Result cmdResult = executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertTrue("After check call restore is " + restore, restore);
       String commandOutput = cmdResult.getCommandOutput();
       commandOutput.trim();



Mime
View raw message