hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r525290 [2/3] - in /lucene/hadoop/trunk: ./ bin/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/
Date Tue, 03 Apr 2007 21:39:27 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Tue Apr  3 14:39:25 2007
@@ -19,88 +19,399 @@
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.dfs.FSDirectory.INode;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
  * 
  * @author Konstantin Shvachko
  */
-class FSImage {
+class FSImage extends Storage {
 
   //
   // The filenames used for storing the images
   //
-  private enum NameNodeFile {
-    IMAGE ("fsimage"),
-    CKPT ("fsimage.ckpt"),
-    TIME ("fstime");
+  enum NameNodeFile {
+    IMAGE     ("fsimage"),
+    TIME      ("fstime"),
+    EDITS     ("edits"),
+    IMAGE_NEW ("fsimage.ckpt"),
+    EDITS_NEW ("edits.new");
+    
+    private String fileName = null;
+    private NameNodeFile( String name ) {this.fileName = name;}
+    String getName() {return fileName;}
+  }
+  
+  private long checkpointTime = -1L;
+  private FSEditLog editLog = null;
+  private boolean isUpgradeFinalized = false;
 
-    private String fileName;
-    private NameNodeFile(String name) {
-      this.fileName = name;
-    }
-    String getName() {
-      return fileName;
-    }
+  /**
+   */
+  FSImage() {
+    super( NodeType.NAME_NODE );
+    this.editLog = new FSEditLog( this );
+  }
+
+  /**
+   */
+  FSImage( Collection<File> fsDirs ) throws IOException {
+    this();
+    setStorageDirectories( fsDirs );
   }
 
-  private File[] imageDirs;  /// directories that contains the image file 
-  private FSEditLog editLog;
+  FSImage( StorageInfo storageInfo ) {
+    super( NodeType.NAME_NODE, storageInfo );
+  }
+
+  /**
+   * Represents an Image (image and edit file).
+   */
+  FSImage( File imageDir ) throws IOException {
+    this();
+    ArrayList<File> dirs = new ArrayList<File>(1);
+    dirs.add( imageDir );
+    setStorageDirectories( dirs );
+  }
+  
+  void setStorageDirectories( Collection<File> fsDirs ) throws IOException {
+    this.storageDirs = new ArrayList<StorageDirectory>( fsDirs.size() );
+    for( Iterator<File> it = fsDirs.iterator(); it.hasNext(); )
+      this.addStorageDir( new StorageDirectory( it.next() ));
+  }
 
   /**
+   */
+  File getImageFile( int imageDirIdx, NameNodeFile type ) {
+    return getImageFile( getStorageDir( imageDirIdx ), type );
+  }
+  
+  static File getImageFile( StorageDirectory sd, NameNodeFile type ) {
+    return new File( sd.getCurrentDir(), type.getName() );
+  }
+  
+  File getEditFile( int idx ) {
+    return getImageFile( idx, NameNodeFile.EDITS );
+  }
+  
+  File getEditNewFile( int idx ) {
+    return getImageFile( idx, NameNodeFile.EDITS_NEW );
+  }
+  
+  /**
+   * Analyze storage directories.
+   * Recover from previous transitions if required. 
+   * Perform fs state transition if necessary depending on the namespace info.
+   * Read storage info. 
    * 
+   * @param dataDirs
+   * @param startOpt startup option
+   * @throws IOException
    */
-  FSImage( File[] fsDirs ) throws IOException {
-    this.imageDirs = new File[fsDirs.length];
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      imageDirs[idx] = new File(fsDirs[idx], "image");
-      if (! imageDirs[idx].exists()) {
-        throw new IOException("NameNode not formatted: " + imageDirs[idx]);
+  void recoverTransitionRead( Collection<File> dataDirs,
+                              StartupOption startOpt
+                            ) throws IOException {
+    assert startOpt != StartupOption.FORMAT : 
+      "NameNode formatting should be performed before reading the image";
+    // 1. For each data directory calculate its state and 
+    // check whether all is consistent before transitioning.
+    this.storageDirs = new ArrayList<StorageDirectory>( dataDirs.size() );
+    AbstractList<StorageState> dataDirStates = 
+                                new ArrayList<StorageState>( dataDirs.size() );
+    boolean isFormatted = false;
+    for( Iterator<File> it = dataDirs.iterator(); it.hasNext(); ) {
+      File dataDir = it.next();
+      StorageDirectory sd = new StorageDirectory( dataDir );
+      StorageState curState;
+      try {
+        curState = sd.analyzeStorage( startOpt );
+        // sd is locked but not opened
+        switch( curState ) {
+        case NON_EXISTENT:
+          // name-node fails if any of the configured storage dirs are missing
+          throw new InconsistentFSStateException( sd.root,
+              "storage directory does not exist or is not accessible." );
+        case NOT_FORMATTED:
+          break;
+        case CONVERT:
+          if( convertLayout( sd ) ) // need to reformat empty image
+            curState = StorageState.NOT_FORMATTED;
+          break;
+        case NORMAL:
+          break;
+        default:  // recovery is possible
+          sd.doRecover( curState );      
+        }
+        if( curState != StorageState.NOT_FORMATTED 
+            && startOpt != StartupOption.ROLLBACK ) {
+          sd.read(); // read and verify consistency with other directories
+          isFormatted = true;
+        }
+      } catch (IOException ioe) {
+        sd.unlock();
+        throw ioe;
+      }
+      // add to the storage list
+      addStorageDir( sd );
+      dataDirStates.add( curState );
+    }
+
+    if( dataDirs.size() == 0 )  // none of the data dirs exist
+      throw new IOException( 
+          "All specified directories are not accessible or do not exist." );
+    if( ! isFormatted && startOpt != StartupOption.ROLLBACK )
+      throw new IOException( "NameNode is not formatted." );
+    if( startOpt != StartupOption.UPGRADE
+        && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
+        && layoutVersion != FSConstants.LAYOUT_VERSION )
+      throw new IOException( 
+          "\nFile system image contains an old layout version " + layoutVersion
+          + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
+          + " is required.\nPlease restart NameNode with -upgrade option." );
+
+    // 2. Format unformatted dirs.
+    this.checkpointTime = 0L;
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      StorageState curState = dataDirStates.get( idx );
+      switch( curState ) {
+      case NON_EXISTENT:
+        assert false : StorageState.NON_EXISTENT + " state cannot be here";
+      case NOT_FORMATTED:
+        LOG.info( "Storage directory " + sd.root + " is not formatted." );
+        LOG.info( "Formatting ..." );
+        sd.clearDirectory(); // create empty currrent dir
+        break;
+      default:
+        break;
+      }
+    }
+
+    // 3. Do transitions
+    switch( startOpt ) {
+    case UPGRADE:
+      doUpgrade();
+      break;
+    case ROLLBACK:
+      doRollback();
+      // and now load that image
+    case REGULAR:
+      if( loadFSImage() )
+        saveFSImage();
+      else
+        editLog.open();
+    }
+    assert editLog != null : "editLog must be initialized";
+    assert editLog.getNumEditStreams() > 0 : "editLog should be opened";
+  }
+
+  private void doUpgrade() throws IOException {
+    // Upgrade is allowed only if there are 
+    // no previous fs states in any of the directories
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      if( sd.getPreviousDir().exists() )
+        throw new InconsistentFSStateException( sd.root,
+          "previous fs state should not exist during upgrade. "
+            + "Finalize or rollback first." );
+    }
+
+    // load the latest image
+    this.loadFSImage();
+
+    // Do upgrade for each directory
+    long oldCTime = this.getCTime();
+    this.cTime = FSNamesystem.now();  // generate new cTime for the state
+    int oldLV = this.getLayoutVersion();
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.checkpointTime = FSNamesystem.now();
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      LOG.info( "Upgrading image directory " + sd.root 
+              + ".\n   old LV = " + oldLV
+              + "; old CTime = " + oldCTime
+              + ".\n   new LV = " + this.getLayoutVersion()
+              + "; new CTime = " + this.getCTime() );
+      File curDir = sd.getCurrentDir();
+      File prevDir = sd.getPreviousDir();
+      File tmpDir = sd.getPreviousTmp();
+      assert curDir.exists() : "Current directory must exist.";
+      assert ! prevDir.exists() : "prvious directory must not exist.";
+      assert ! tmpDir.exists() : "prvious.tmp directory must not exist.";
+      // rename current to tmp
+      rename( curDir, tmpDir );
+      // save new image
+      if( ! curDir.mkdir() )
+        throw new IOException("Cannot create directory " + curDir );
+      saveFSImage( getImageFile( sd, NameNodeFile.IMAGE ));
+      editLog.createEditLogFile( getImageFile( sd, NameNodeFile.EDITS ));
+      // write version and time files
+      sd.write();
+      // rename tmp to previous
+      rename( tmpDir, prevDir );
+      isUpgradeFinalized = false;
+      LOG.info( "Upgrade of " + sd.root + " is complete." );
+    }
+    editLog.open();
+  }
+
+  private void doRollback() throws IOException {
+    // Rollback is allowed only if there is 
+    // a previous fs states in at least one of the storage directories.
+    // Directories that don't have previous state do not rollback
+    boolean canRollback = false;
+    FSImage prevState = new FSImage();
+    prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      File prevDir = sd.getPreviousDir();
+      if( ! prevDir.exists() ) {  // use current directory then
+        LOG.info( "Storage directory " + sd.root
+                + " does not contain previous fs state." );
+        sd.read(); // read and verify consistency with other directories
+        continue;
+      }
+      StorageDirectory sdPrev = prevState.new StorageDirectory( sd.root );
+      sdPrev.read( sdPrev.getPreviousVersionFile() );  // read and verify consistency of the prev dir
+      canRollback = true;
+    }
+    if( ! canRollback )
+      throw new IOException( "Cannot rollback. " 
+            + "None of the storage directories contain previous fs state." );
+
+    // Now that we know all directories are going to be consistent
+    // Do rollback for each directory containing previous state
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      File prevDir = sd.getPreviousDir();
+      if( ! prevDir.exists() )
+        continue;
+
+      LOG.info( "Rolling back storage directory " + sd.root 
+          + ".\n   new LV = " + prevState.getLayoutVersion()
+          + "; new CTime = " + prevState.getCTime() );
+      File tmpDir = sd.getRemovedTmp();
+      assert ! tmpDir.exists() : "removed.tmp directory must not exist.";
+      // rename current to tmp
+      File curDir = sd.getCurrentDir();
+      assert curDir.exists() : "Current directory must exist.";
+      rename( curDir, tmpDir );
+      // rename previous to current
+      rename( prevDir, curDir );
+
+      // delete tmp dir
+      deleteDir( tmpDir );
+      LOG.info( "Rollback of " + sd.root + " is complete." );
+    }
+    isUpgradeFinalized = true;
+  }
+
+  private void doFinalize( StorageDirectory sd ) throws IOException {
+    File prevDir = sd.getPreviousDir();
+    if( ! prevDir.exists() )
+      return; // already discarded
+    LOG.info( "Finalizing upgrade for storage directory " 
+            + sd.root 
+            + ".\n   cur LV = " + this.getLayoutVersion()
+            + "; cur CTime = " + this.getCTime() );
+    assert sd.getCurrentDir().exists() : "Current directory must exist.";
+    final File tmpDir = sd.getFinalizedTmp();
+    // rename previous to tmp and remove
+    rename( prevDir, tmpDir );
+    deleteDir( tmpDir );
+    isUpgradeFinalized = true;
+    LOG.info( "Finalize upgrade for " + sd.root + " is complete." );
+  }
+
+  void finalizeUpgrade() throws IOException {
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ )
+      doFinalize( getStorageDir( idx ));
+  }
+
+  boolean isUpgradeFinalized() {
+    return isUpgradeFinalized;
+  }
+
+  protected void getFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    super.getFields( props, sd );
+    if( layoutVersion == 0 )
+      throw new IOException("NameNode directory " 
+                            + sd.root + " is not formatted." );
+    this.checkpointTime = readCheckpointTime( sd );
+  }
+
+  long readCheckpointTime( StorageDirectory sd ) throws IOException {
+    File timeFile = getImageFile( sd, NameNodeFile.TIME );
+    long timeStamp = 0L;
+    if( timeFile.exists() && timeFile.canRead() ) {
+      DataInputStream in = new DataInputStream( new FileInputStream(timeFile) );
+      try {
+        timeStamp = in.readLong();
+      } finally {
+        in.close();
       }
     }
-    this.editLog = new FSEditLog( fsDirs , this);
+    return timeStamp;
   }
 
   /**
-   * Represents an Image (image and edit file).
+   * Write last checkpoint time and version file into the storage directory.
+   * 
+   * The version file should always be written last.
+   * Missing or corrupted version file indicates that 
+   * the checkpoint is not valid.
+   * 
+   * @param sd storage directory
+   * @throws IOException
    */
-  FSImage(File imageDir, String edits) throws IOException {
-    this.imageDirs = new File[1];
-    imageDirs[0] = imageDir;
-    if (!imageDirs[0].exists()) {
-      throw new IOException("File " + imageDirs[0] + " not found.");
-    }
-    this.editLog = new FSEditLog(imageDir, this, edits);
+  protected void setFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    super.setFields( props, sd );
+    writeCheckpointTime( sd );
   }
 
-  /*
-   * Create an fsimage and edits log from scratch.
+  /**
+   * Write last checkpoint time into a separate file.
+   * 
+   * @param sd
+   * @throws IOException
    */
-  void create() throws IOException {
-    saveFSImage(NameNodeFile.IMAGE.getName());
-    editLog.create();
+  void writeCheckpointTime( StorageDirectory sd ) throws IOException {
+    if( checkpointTime < 0L )
+      return; // do not write negative time
+    File timeFile = getImageFile( sd, NameNodeFile.TIME );
+    if (timeFile.exists()) { timeFile.delete(); }
+    DataOutputStream out = new DataOutputStream(
+          new FileOutputStream(timeFile));
+    try {
+      out.writeLong( checkpointTime );
+    } finally {
+      out.close();
+    }
   }
 
   /**
@@ -110,248 +421,297 @@
    * server to exit
    */
   void processIOError(int index) throws IOException {
-    if (imageDirs.length == 1) {
+    int nrDirs = getNumStorageDirs();
+    assert( index >= 0 && index < nrDirs );
+    if( nrDirs == 1 )
       throw new IOException("Checkpoint directories inaccessible.");
-    }
-    assert(index < imageDirs.length);
-    int newsize = imageDirs.length - 1;
-    int oldsize = imageDirs.length;
-
-    //
-    // save existing values and allocate space for new ones
-    //
-    File[] imageDirs1 = imageDirs;
-    imageDirs = new File[newsize];
-
-    //
-    // copy in saved values, skipping the one on which we had
-    // an error
-    //
-    for (int idx = 0; idx < index; idx++) {
-      imageDirs[idx] = imageDirs1[idx];
-    }
-    for (int idx = index; idx < oldsize - 1; idx++) {
-      imageDirs[idx] = imageDirs1[idx+1];
-    }
+    storageDirs.remove( index );
   }
-        
+
   FSEditLog getEditLog() {
     return editLog;
   }
 
-  /**
-   * Load in the filesystem image.  It's a big list of
-   * filenames and blocks.  Return whether we should
-   * "re-save" and consolidate the edit-logs
-   */
-  void loadFSImage( Configuration conf ) throws IOException {
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      //
-      // Atomic move sequence, to recover from interrupted save
-      //
-      File curFile = new File(imageDirs[idx], 
-                              NameNodeFile.IMAGE.getName());
-      File ckptFile = new File(imageDirs[idx], 
-                              NameNodeFile.CKPT.getName());
-
-      //
-      // If we were in the midst of a checkpoint
-      //
-      if (ckptFile.exists()) {
-        if (editLog.existsNew(idx)) {
-          //
-          // checkpointing migth have uploaded a new
-          // merged image, but we discard it here because we are
-          // not sure whether the entire merged image was uploaded
-          // before the namenode crashed.
-          //
-          if (!ckptFile.delete()) {
-            throw new IOException("Unable to delete " + ckptFile);
-          }
-        } else {
-          //
-          // checkpointing was in progress when the namenode
-          // shutdown. The fsimage.ckpt was created and the edits.new
-          // file was moved to edits. We complete that checkpoint by
-          // moving fsimage.new to fsimage. There is no need to 
-          // update the fstime file here. renameTo fails on Windows
-          // if the destination file already exists.
-          //
+  boolean isConversionNeeded( StorageDirectory sd ) throws IOException {
+    File oldImageDir = new File( sd.root, "image" );
+    if( ! oldImageDir.exists() )
+      return false;
+    // check consistency of the old storage
+    if( ! oldImageDir.isDirectory() )
+      throw new InconsistentFSStateException( sd.root,
+          oldImageDir + " is not a directory." );
+    if( ! oldImageDir.canWrite() )
+      throw new InconsistentFSStateException( sd.root,
+          oldImageDir + " is not writable." );
+    return true;
+  }
+  
+  private boolean convertLayout( StorageDirectory sd ) throws IOException {
+    assert FSConstants.LAYOUT_VERSION < LAST_PRE_UPGRADE_LAYOUT_VERSION :
+      "Bad current layout version: FSConstants.LAYOUT_VERSION should decrease";
+    File oldImageDir = new File( sd.root, "image" );
+    assert oldImageDir.exists() : "Old image directory is missing";
+    File oldImage = new File( oldImageDir, "fsimage" );
+    
+    LOG.info( "Old layout version directory " + oldImageDir
+            + " is found. New layout version is "
+            + FSConstants.LAYOUT_VERSION );
+    LOG.info( "Trying to convert ..." );
+
+    // we did not use locking for the pre upgrade layout, so we cannot prevent 
+    // old name-nodes from running in the same directory as the new ones
+
+    // check new storage
+    File newImageDir = sd.getCurrentDir();
+    File versionF = sd.getVersionFile();
+    if( versionF.exists() )
+      throw new IOException( "Version file already exists: " + versionF );
+    if( newImageDir.exists() ) // // somebody created current dir manually
+      deleteDir( newImageDir );
+
+    // move old image files into new location
+    rename( oldImageDir, newImageDir );
+    File oldEdits1 = new File( sd.root, "edits" );
+    // move old edits into data
+    if( oldEdits1.exists() )
+      rename( oldEdits1, getImageFile( sd, NameNodeFile.EDITS ));
+    File oldEdits2 = new File( sd.root, "edits.new" );
+    if( oldEdits2.exists() )
+      rename( oldEdits2, getImageFile( sd, NameNodeFile.EDITS_NEW ));
+
+    // Write new layout with 
+    // setting layoutVersion = LAST_PRE_UPGRADE_LAYOUT_VERSION
+    // means the actual version should be obtained from the image file
+    this.layoutVersion = LAST_PRE_UPGRADE_LAYOUT_VERSION;
+    File newImageFile = getImageFile( sd, NameNodeFile.IMAGE );
+    boolean needReformat = false;
+    if( ! newImageFile.exists() ) {
+      // in pre upgrade versions image file was allowed not to exist
+      // we treat it as non formatted then
+      LOG.info( "Old image file " + oldImage + " does not exist. " );
+      needReformat = true;
+    } else {
+      sd.write();
+      LOG.info( "Conversion of " + oldImage + " is complete." );
+    }
+    return needReformat;
+  }
+
+  //
+  // Atomic move sequence, to recover from interrupted checkpoint
+  //
+  void recoverInterruptedCheckpoint( StorageDirectory sd ) throws IOException {
+    File curFile = getImageFile( sd, NameNodeFile.IMAGE );
+    File ckptFile = getImageFile( sd, NameNodeFile.IMAGE_NEW );
+
+    //
+    // If we were in the midst of a checkpoint
+    //
+    if (ckptFile.exists()) {
+      if (getImageFile( sd, NameNodeFile.EDITS_NEW ).exists()) {
+        //
+        // checkpointing migth have uploaded a new
+        // merged image, but we discard it here because we are
+        // not sure whether the entire merged image was uploaded
+        // before the namenode crashed.
+        //
+        if (!ckptFile.delete()) {
+          throw new IOException("Unable to delete " + ckptFile);
+        }
+      } else {
+        //
+        // checkpointing was in progress when the namenode
+        // shutdown. The fsimage.ckpt was created and the edits.new
+        // file was moved to edits. We complete that checkpoint by
+        // moving fsimage.new to fsimage. There is no need to 
+        // update the fstime file here. renameTo fails on Windows
+        // if the destination file already exists.
+        //
+        if (!ckptFile.renameTo(curFile)) {
+          curFile.delete();
           if (!ckptFile.renameTo(curFile)) {
-            curFile.delete();
-            if (!ckptFile.renameTo(curFile)) {
-              throw new IOException("Unable to rename " + ckptFile +
-                                    " to " + curFile);
-            }
+            throw new IOException("Unable to rename " + ckptFile +
+                                  " to " + curFile);
           }
         }
       }
     }
+  }
 
+  /**
+   * Choose latest image from one of the directories,
+   * load it and merge with the edits from that directory.
+   * 
+   * @return whether the image should be saved
+   * @throws IOException
+   */
+  boolean loadFSImage() throws IOException {
     // Now check all curFiles and see which is the newest
-    File curFile = null;
-    long maxTimeStamp = Long.MIN_VALUE;
-    int saveidx = 0;
+    long latestCheckpointTime = Long.MIN_VALUE;
+    StorageDirectory latestSD = null;
     boolean needToSave = false;
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      File file = new File(imageDirs[idx], 
-                           NameNodeFile.IMAGE.getName());
-      if (file.exists()) {
-        long timeStamp = 0;
-        File timeFile = new File(imageDirs[idx], 
-                                 NameNodeFile.TIME.getName());
-        if (timeFile.exists() && timeFile.canRead()) {
-          DataInputStream in = new DataInputStream(
-              new FileInputStream(timeFile));
-          try {
-            timeStamp = in.readLong();
-          } finally {
-            in.close();
-          }
-        } else {
-          needToSave |= true;
-        }
-        if (maxTimeStamp < timeStamp) {
-          maxTimeStamp = timeStamp;
-          curFile = file;
-          saveidx = idx;
-        }
-      } else {
+    isUpgradeFinalized = true;
+    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+      StorageDirectory sd = getStorageDir( idx );
+      recoverInterruptedCheckpoint( sd );
+      if( ! sd.getVersionFile().exists() ) {
         needToSave |= true;
+        continue; // some of them might have just been formatted
+      }
+      assert getImageFile( sd, NameNodeFile.IMAGE ).exists() :
+        "Image file must exist.";
+      checkpointTime = readCheckpointTime( sd );
+      if( latestCheckpointTime < checkpointTime ) {
+        latestCheckpointTime = checkpointTime;
+        latestSD = sd;
       }
+      if( checkpointTime <= 0L )
+        needToSave |= true;
+      // set finalized flag
+      isUpgradeFinalized &= ! sd.getPreviousDir().exists();
     }
+    assert latestSD != null : "Latest storage directory was not determined.";
 
     //
     // Load in bits
     //
-    needToSave |= loadFSImage(conf, curFile);
+    latestSD.read();
+    needToSave |= loadFSImage( getImageFile( latestSD, NameNodeFile.IMAGE ));
 
     //
     // read in the editlog from the same directory from
     // which we read in the image
     //
-    needToSave |= (editLog.loadFSEdits(conf, saveidx) > 0);
-    if (needToSave) {
-      saveFSImage();
-    }
+    needToSave |= ( loadFSEdits( latestSD ) > 0 );
+
+    return needToSave;
   }
 
-  boolean loadFSImage(Configuration conf, File curFile)
-                      throws IOException {
-    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
+  /**
+   * Load in the filesystem imagefrom file. It's a big list of
+   * filenames and blocks.  Return whether we should
+   * "re-save" and consolidate the edit-logs
+   */
+  boolean loadFSImage( File curFile ) throws IOException {
+    assert this.getLayoutVersion() < 0 : "Negative layout version is expected.";
+    assert curFile != null : "curFile is null";
+
+    FSDirectory fsDir = FSNamesystem.getFSNamesystem().dir;
     //
     // Load in bits
     //
     boolean needToSave = true;
-    int imgVersion = FSConstants.DFS_CURRENT_VERSION;
-    if (curFile != null) {
-      DataInputStream in = new DataInputStream(
-                              new BufferedInputStream(
-                                  new FileInputStream(curFile)));
-      try {
-        // read image version: first appeared in version -1
-        imgVersion = in.readInt();
-        // read namespaceID: first appeared in version -2
-        if( imgVersion <= -2 )
-          fsDir.namespaceID = in.readInt();
-        // read number of files
-        int numFiles = 0;
-        // version 0 does not store version #
-        // starts directly with the number of files
-        if( imgVersion >= 0 ) {
-          numFiles = imgVersion;
-          imgVersion = 0;
-        } else {
-          numFiles = in.readInt();
-        }
+    int imgVersion = this.getLayoutVersion();
+    DataInputStream in = new DataInputStream(
+                            new BufferedInputStream(
+                                new FileInputStream(curFile)));
+    try {
+      /*
+       * TODO we need to change format of the image file
+       * it should not contain version and namespace fields
+       */
+      // read image version: first appeared in version -1
+      imgVersion = in.readInt();
+      // read namespaceID: first appeared in version -2
+      if( imgVersion <= -2 )
+        this.namespaceID = in.readInt();
+      // read number of files
+      int numFiles = 0;
+      // version 0 does not store version #
+      // starts directly with the number of files
+      if( imgVersion >= 0 ) {
+        numFiles = imgVersion;
+        imgVersion = 0;
+      } else {
+        numFiles = in.readInt();
+      }
+      this.layoutVersion = imgVersion;
 
-        needToSave = ( imgVersion != FSConstants.DFS_CURRENT_VERSION );
-        if( imgVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
-          throw new IncorrectVersionException(imgVersion, "file system image");
-
-        // read file info
-        short replication = (short)conf.getInt("dfs.replication", 3);
-        for (int i = 0; i < numFiles; i++) {
-          UTF8 name = new UTF8();
-          name.readFields(in);
-          // version 0 does not support per file replication
-          if( !(imgVersion >= 0) ) {
-            replication = in.readShort(); // other versions do
-            replication = FSEditLog.adjustReplication( replication, conf );
-          }
-          int numBlocks = in.readInt();
-          Block blocks[] = null;
-          if (numBlocks > 0) {
-            blocks = new Block[numBlocks];
-            for (int j = 0; j < numBlocks; j++) {
-              blocks[j] = new Block();
-              blocks[j].readFields(in);
-            }
+      needToSave = ( imgVersion != FSConstants.LAYOUT_VERSION );
+
+      // read file info
+      short replication = FSNamesystem.getFSNamesystem().getDefaultReplication();
+      for (int i = 0; i < numFiles; i++) {
+        UTF8 name = new UTF8();
+        name.readFields(in);
+        // version 0 does not support per file replication
+        if( !(imgVersion >= 0) ) {
+          replication = in.readShort(); // other versions do
+          replication = FSEditLog.adjustReplication( replication );
+        }
+        int numBlocks = in.readInt();
+        Block blocks[] = null;
+        if (numBlocks > 0) {
+          blocks = new Block[numBlocks];
+          for (int j = 0; j < numBlocks; j++) {
+            blocks[j] = new Block();
+            blocks[j].readFields(in);
           }
-          fsDir.unprotectedAddFile(name, blocks, replication );
         }
-
-        // load datanode info
-        this.loadDatanodes( imgVersion, in );
-      } finally {
-        in.close();
+        fsDir.unprotectedAddFile(name, blocks, replication );
       }
+      
+      // load datanode info
+      this.loadDatanodes( imgVersion, in );
+    } finally {
+      in.close();
     }
-    if( fsDir.namespaceID == 0 )
-      fsDir.namespaceID = newNamespaceID();
-
+    
     return needToSave;
   }
 
   /**
-   * Save the contents of the FS image
+   * Load and merge edits from two edits files
+   * 
+   * @param sd storage directory
+   * @return number of edits loaded
+   * @throws IOException
    */
-  void saveFSImage(String filename) throws IOException {
-    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      File newFile = new File(imageDirs[idx], filename);
-      
-      //
-      // Write out data
-      //
-      DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(
-            new FileOutputStream(newFile)));
-      try {
-        out.writeInt(FSConstants.DFS_CURRENT_VERSION);
-        out.writeInt(fsDir.namespaceID);
-        out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
-        saveImage( "", fsDir.rootDir, out );
-        saveDatanodes( out );
-      } finally {
-        out.close();
-      }
-    }
+  int loadFSEdits( StorageDirectory sd ) throws IOException {
+    int numEdits = 0;
+    numEdits = editLog.loadFSEdits( getImageFile( sd, NameNodeFile.EDITS ));
+    File editsNew = getImageFile( sd, NameNodeFile.EDITS_NEW );
+    if( editsNew.exists() ) 
+      numEdits += editLog.loadFSEdits( editsNew );
+    return numEdits;
   }
 
   /**
-   * Save the contents of the FS image
+   * Save the contents of the FS image to the file.
    */
-  void saveFSImage() throws IOException {
-    editLog.createNewIfMissing();
-    saveFSImage(NameNodeFile.CKPT.getName());
-    rollFSImage(false);
-  }
-
-  void updateTimeFile(File timeFile, long timestamp) throws IOException {
-    if (timeFile.exists()) { timeFile.delete(); }
+  void saveFSImage( File newFile  ) throws IOException {
+    FSDirectory fsDir = FSNamesystem.getFSNamesystem().dir;
+    //
+    // Write out data
+    //
     DataOutputStream out = new DataOutputStream(
-          new FileOutputStream(timeFile));
+          new BufferedOutputStream(
+          new FileOutputStream(newFile)));
     try {
-      out.writeLong(timestamp);
+      out.writeInt(FSConstants.LAYOUT_VERSION);
+      out.writeInt(namespaceID);
+      out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
+      saveImage( "", fsDir.rootDir, out );
+      saveDatanodes( out );
     } finally {
       out.close();
     }
   }
 
   /**
+   * Save the contents of the FS image
+   */
+  void saveFSImage() throws IOException {
+    editLog.createNewIfMissing();
+    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+      StorageDirectory sd = getStorageDir( idx );
+      saveFSImage( getImageFile( sd, NameNodeFile.IMAGE_NEW ));
+      editLog.createEditLogFile( getImageFile( sd, NameNodeFile.EDITS ));
+    }
+    rollFSImage();
+  }
+
+  /**
    * Generate new namespaceID.
    * 
    * namespaceID is a persistent attribute of the namespace.
@@ -365,32 +725,46 @@
    */
   private int newNamespaceID() {
     Random r = new Random();
-    r.setSeed( System.currentTimeMillis() );
+    r.setSeed( FSNamesystem.now() );
     int newID = 0;
     while( newID == 0)
-      newID = r.nextInt();
+      newID = r.nextInt( 0x7FFFFFFF );  // use 31 bits only
     return newID;
   }
-  
+
   /** Create new dfs name directory.  Caution: this destroys all files
    * in this filesystem. */
-  static void format(File dir) throws IOException {
-    File image = new File(dir, "image");
-    File edits = new File(dir, "edits");
-    
-    if (!((!image.exists() || FileUtil.fullyDelete(image)) &&
-        (!edits.exists() || edits.delete()) &&
-        image.mkdirs())) {
-      throw new IOException("Unable to format: "+dir);
+  void format( StorageDirectory sd ) throws IOException {
+    sd.clearDirectory(); // create currrent dir
+    sd.lock();
+    try {
+      saveFSImage( getImageFile( sd, NameNodeFile.IMAGE ));
+      editLog.createEditLogFile( getImageFile( sd, NameNodeFile.EDITS ));
+      sd.write();
+    } finally {
+      sd.unlock();
+    }
+    LOG.info( "Storage directory " + sd.root 
+        + " has been successfully formatted." );
+  }
+
+  public void format() throws IOException {
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.namespaceID = newNamespaceID();
+    this.cTime = 0L;
+    this.checkpointTime = FSNamesystem.now();
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      format( sd );
     }
   }
 
   /**
    * Save file tree image starting from the given root.
    */
-  void saveImage( String parentPrefix, 
-                  FSDirectory.INode root, 
-                  DataOutputStream out ) throws IOException {
+  private static void saveImage(String parentPrefix, 
+                                FSDirectory.INode root, 
+                                DataOutputStream out ) throws IOException {
     String fullName = "";
     if( root.getParent() != null) {
       fullName = parentPrefix + "/" + root.getLocalName();
@@ -440,18 +814,12 @@
       fsNamesys.unprotectedAddDatanode(nodeImage.getDatanodeDescriptor());
     }
   }
+
   /**
    * Moves fsimage.ckpt to fsImage and edits.new to edits
    * Reopens the new edits file.
    */
   void rollFSImage() throws IOException {
-    rollFSImage(true);
-  }
-
-  /**
-   * Moves fsimage.ckpt to fsImage and edits.new to edits
-   */
-  void rollFSImage(boolean reopenEdits) throws IOException {
     //
     // First, verify that edits.new and fsimage.ckpt exists in all
     // checkpoint directories.
@@ -459,31 +827,23 @@
     if (!editLog.existsNew()) {
       throw new IOException("New Edits file does not exist");
     }
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      File ckpt = new File(imageDirs[idx], 
-                           NameNodeFile.CKPT.getName());
-      File curFile = new File(imageDirs[idx], 
-                              NameNodeFile.IMAGE.getName());
-
-      if (!curFile.exists()) {
-        throw new IOException("Image file " + curFile +
-                              " does not exist");
-      }
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      File ckpt = getImageFile( sd, NameNodeFile.IMAGE_NEW );
       if (!ckpt.exists()) {
         throw new IOException("Checkpoint file " + ckpt +
                               " does not exist");
       }
     }
-    editLog.purgeEditLog(reopenEdits); // renamed edits.new to edits
+    editLog.purgeEditLog(); // renamed edits.new to edits
 
     //
     // Renames new image
     //
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-      File ckpt = new File(imageDirs[idx], 
-                           NameNodeFile.CKPT.getName());
-      File curFile = new File(imageDirs[idx], 
-                              NameNodeFile.IMAGE.getName());
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
+      File ckpt = getImageFile( sd, NameNodeFile.IMAGE_NEW );
+      File curFile = getImageFile( sd, NameNodeFile.IMAGE );
       // renameTo fails on Windows if the destination file 
       // already exists.
       if (!ckpt.renameTo(curFile)) {
@@ -496,26 +856,32 @@
     }
 
     //
-    // Updates the fstime file
+    // Updates the fstime file and write version file
     //
-    long now = System.currentTimeMillis();
-    for (int idx = 0; idx < imageDirs.length; idx++) {
-	  File timeFile = new File(imageDirs[idx], 
-                               NameNodeFile.TIME.getName());
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.checkpointTime = FSNamesystem.now();
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      StorageDirectory sd = getStorageDir( idx );
       try {
-        updateTimeFile(timeFile, now);
+        sd.write();
       } catch (IOException e) {
+        LOG.error( "Cannot write file " + sd.root, e );
         editLog.processIOError(idx);
         idx--;
       }
     }
   }
 
+  void close() throws IOException {
+    getEditLog().close();
+    unlockAll();
+  }
+
   /**
    * Return the name of the image file.
    */
   File getFsImageName() {
-      return new File(imageDirs[0], NameNodeFile.IMAGE.getName());
+    return getImageFile( 0, NameNodeFile.IMAGE );
   }
 
   /**
@@ -523,20 +889,18 @@
    * checkpointing.
    */
   File[] getFsImageNameCheckpoint() {
-      File[] list = new File[imageDirs.length];
-      for (int i = 0; i < imageDirs.length; i++) {
-        list[i] = new File(imageDirs[i], 
-                           NameNodeFile.CKPT.getName());
-      }
-      return list;
+    File[] list = new File[getNumStorageDirs()];
+    for( int i = 0; i < getNumStorageDirs(); i++ ) {
+      list[i] = getImageFile( getStorageDir( i ), NameNodeFile.IMAGE_NEW );
+    }
+    return list;
   }
 
+  /**
+   * DatanodeImage is used to store persistent information
+   * about datanodes into the fsImage.
+   */
   static class DatanodeImage implements WritableComparable {
-
-    /**************************************************
-     * DatanodeImage is used to store persistent information
-     * about datanodes into the fsImage.
-     **************************************************/
     DatanodeDescriptor              node;
 
     DatanodeImage() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Apr  3 14:39:25 2007
@@ -212,8 +212,7 @@
      * dirs is a list oif directories where the filesystem directory state 
      * is stored
      */
-    public FSNamesystem(File[] dirs, 
-                        String hostname,
+    public FSNamesystem(String hostname,
                         int port,
                         NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
@@ -246,8 +245,10 @@
 
         this.localMachine = hostname;
         this.port = port;
-        this.dir = new FSDirectory(dirs);
-        this.dir.loadFSImage( conf );
+        this.dir = new FSDirectory();
+        StartupOption startOpt = (StartupOption)conf.get( 
+                                "dfs.namenode.startup", StartupOption.REGULAR );
+        this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
         this.safeMode = new SafeModeInfo( conf );
         setBlockTotal();
         pendingReplications = new PendingReplicationBlocks(LOG);
@@ -281,6 +282,17 @@
         LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
     }
 
+    static Collection<File> getNamespaceDirs(Configuration conf) {
+      String[] dirNames = conf.getStrings("dfs.name.dir");
+      if (dirNames == null)
+        dirNames = new String[] {"/tmp/hadoop/dfs/name"};
+      Collection<File> dirs = new ArrayList<File>( dirNames.length );
+      for( int idx = 0; idx < dirNames.length; idx++ ) {
+        dirs.add( new File(dirNames[idx] ));
+      }
+      return dirs;
+    }
+
     /**
      * dirs is a list of directories where the filesystem directory state 
      * is stored
@@ -296,6 +308,11 @@
     public static FSNamesystem getFSNamesystem() {
         return fsNamesystemObject;
     } 
+    
+    NamespaceInfo getNamespaceInfo() {
+      return new NamespaceInfo( dir.fsImage.getNamespaceID(),
+                                dir.fsImage.getCTime() );
+    }
 
     /** Close down this filesystem manager.
      * Causes heartbeat and lease daemons to stop; waits briefly for
@@ -1513,7 +1530,6 @@
           + "node registration from " + nodeReg.getName()
           + " storage " + nodeReg.getStorageID() );
 
-      nodeReg.registrationID = getRegistrationID();
       DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
       DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() );
       
@@ -1594,12 +1610,12 @@
     /**
      * Get registrationID for datanodes based on the namespaceID.
      * 
-     * @see #registerDatanode(DatanodeRegistration)
+     * @see #registerDatanode(DatanodeRegistration,String)
      * @see FSImage#newNamespaceID()
      * @return registration ID
      */
     public String getRegistrationID() {
-      return "NS" + Integer.toString( dir.namespaceID );
+      return Storage.getRegistrationID( dir.fsImage );
     }
     
     /**
@@ -1622,7 +1638,7 @@
     
     private boolean isDatanodeDead(DatanodeDescriptor node) {
       return (node.getLastUpdate() <
-          (System.currentTimeMillis() - heartbeatExpireInterval));
+          (now() - heartbeatExpireInterval));
     }
     
     void setDatanodeDead(DatanodeID nodeID) throws IOException {
@@ -2450,6 +2466,11 @@
     public Date getStartTime() {
         return startTime;
     }
+    
+    short getMaxReplication()     { return (short)maxReplication; }
+    short getMinReplication()     { return (short)minReplication; }
+    short getDefaultReplication() { return (short)defaultReplication; }
+    
     /////////////////////////////////////////////////////////
     //
     // These methods are called by the Namenode system, to see
@@ -2476,7 +2497,7 @@
         Iterator<Block> it = null;
         int sendNum = invalidateSet.size();
         int origSize = sendNum;
-        ArrayList sendBlock = new ArrayList(sendNum);
+        ArrayList<Block> sendBlock = new ArrayList<Block>(sendNum);
 
         //
         // calculate the number of blocks that we send in one message
@@ -3825,14 +3846,6 @@
       dir.fsImage.rollFSImage();
     }
 
-    File getFsImageName() throws IOException {
-      return dir.fsImage.getFsImageName();
-    }
-
-    File[] getFsImageNameCheckpoint() throws IOException {
-      return dir.fsImage.getFsImageNameCheckpoint();
-    }
-
     File getFsEditName() throws IOException {
       return getEditLog().getFsEditName();
     }
@@ -3876,7 +3889,6 @@
         try {
           ServletContext context = getServletContext();
           NameNode nn = (NameNode) context.getAttribute("name.node");
-          Configuration conf = (Configuration) context.getAttribute("name.conf");
           TransferFsImage ff = new TransferFsImage(pmap, request, response);
           if (ff.getImage()) {
             // send fsImage to Secondary

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java?view=auto&rev=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java Tue Apr  3 14:39:25 2007
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The exception is thrown when file system state is inconsistent 
+ * and is not recoverable. 
+ * 
+ * @author Konstantin Shvachko
+ */
+class InconsistentFSStateException extends IOException {
+
+  public InconsistentFSStateException( File dir, String descr ) {
+    super( "Directory " + getFilePath( dir )
+          + " is in an inconsistent state: " + descr );
+  }
+
+  public InconsistentFSStateException( File dir, String descr, Throwable ex ) {
+    this( dir, descr + "\n" + StringUtils.stringifyException(ex) );
+  }
+  
+  private static String getFilePath( File dir ) {
+    try {
+      return dir.getCanonicalPath();
+    } catch( IOException e ) {}
+    return dir.getPath();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java Tue Apr  3 14:39:25 2007
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.dfs;
 
 import java.io.IOException;
@@ -10,10 +27,16 @@
  */
 class IncorrectVersionException extends IOException {
 
-  public IncorrectVersionException( int version, String ofWhat ) {
+  public IncorrectVersionException( int versionReported, String ofWhat ) {
+    this( versionReported, ofWhat, FSConstants.LAYOUT_VERSION );
+  }
+  
+  public IncorrectVersionException( int versionReported,
+                                    String ofWhat,
+                                    int versionExpected ) {
     super( "Unexpected version " 
-        + (ofWhat==null ? "" : "of " + ofWhat) + " reported: "
-        + version + ". Expecting = " + FSConstants.DFS_CURRENT_VERSION + "." );
+        + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
+        + versionReported + ". Expecting = " + versionExpected + "." );
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Apr  3 14:39:25 2007
@@ -28,7 +28,8 @@
 
 import java.io.*;
 import java.net.*;
-import org.apache.hadoop.dfs.DatanodeProtocol.DataNodeAction;
+import java.util.Collection;
+import java.util.Iterator;
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -95,26 +96,7 @@
     /** Format a new filesystem.  Destroys any filesystem that may already
      * exist at this location.  **/
     public static void format(Configuration conf) throws IOException {
-      File[] dirs = getDirs(conf);
-      for (int idx = 0; idx < dirs.length; idx++) {
-        FSImage.format(dirs[idx]);
-      }
-      FSImage fsimage = new FSImage(dirs);
-      FSNamesystem namesystem = new FSNamesystem(fsimage);
-      fsimage.create();
-      fsimage.getEditLog().close();
-    }
-
-    /** Format a new filesystem.  Destroys any filesystem that may already
-     * exist at this location.  **/
-    public static void format(File dir) throws IOException {
-      File dirs[] = new File[1];
-      dirs[0] = dir;
-      FSImage.format(dir);
-      FSImage fsimage = new FSImage(dirs);
-      FSNamesystem namesystem = new FSNamesystem(fsimage);
-      fsimage.create();
-      fsimage.getEditLog().close();
+      format( conf, false );
     }
 
     private class NameNodeMetrics implements Updater {
@@ -170,13 +152,14 @@
     
     /**
      * Initialize the server
-     * @param dirs the list of working directories
+     * 
      * @param hostname which hostname to bind to
      * @param port the port number to bind to
      * @param conf the configuration
      */
-    private void init(File[] dirs, String hostname, int port, 
-                      Configuration conf) throws IOException {
+    private void init(String hostname, int port, 
+                      Configuration conf
+                      ) throws IOException {
       this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
       this.server = RPC.getServer(this, hostname, port, handlerCount, 
                                   false, conf);
@@ -187,9 +170,9 @@
       LOG.info("Namenode up at: " + this.nameNodeAddress);
 
       try {
-        this.namesystem = new FSNamesystem(dirs, this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
+        this.namesystem = new FSNamesystem(this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
         this.server.start();  //start RPC server   
-
+  
         this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
         this.emptier.setDaemon(true);
         this.emptier.start();
@@ -201,16 +184,31 @@
     }
     
     /**
-     * Create a NameNode at the default location.
+     * Start NameNode.
+     * <p>
+     * The name-node can be started with one of the following startup options:
+     * <ul> 
+     * <li>{@link FSConstants.StartupOption#REGULAR REGULAR} - normal startup</li>
+     * <li>{@link FSConstants.StartupOption#FORMAT FORMAT} - format name node</li>
+     * <li>{@link FSConstants.StartupOption#UPGRADE UPGRADE} - start the cluster  
+     * upgrade and create a snapshot of the current file system state</li> 
+     * <li>{@link FSConstants.StartupOption#ROLLBACK ROLLBACK} - roll the  
+     *            cluster back to the previous state</li>
+     * </ul>
+     * The option is passed via configuration field: 
+     * <tt>dfs.namenode.startup</tt>
      * 
      * The conf will be modified to reflect the actual ports on which 
      * the NameNode is up and running if the user passes the port as
      * <code>zero</code> in the conf.
+     * 
+     * @param conf  confirguration
+     * @throws IOException
      */
     public NameNode(Configuration conf) throws IOException {
       InetSocketAddress addr = 
         DataNode.createSocketAddr(conf.get("fs.default.name"));
-      init(getDirs(conf), addr.getHostName(), addr.getPort(), conf);
+      init( addr.getHostName(), addr.getPort(), conf );
     }
 
     /**
@@ -220,11 +218,15 @@
      * the NameNode is up and running if the user passes the port as
      * <code>zero</code>.  
      */
-    public NameNode(File[] dirs, String bindAddress, int port, Configuration conf) throws IOException {
-       init(dirs, bindAddress, port, conf);
+    public NameNode(String bindAddress, int port, 
+                    Configuration conf
+                    ) throws IOException {
+      init( bindAddress, port, conf );
     }
 
-    /** Return the configured directories where name data is stored. */
+    /** Return the configured directories where name data is stored. 
+     * @deprecated
+     */
     static File[] getDirs(Configuration conf) {
       String[] dirNames = conf.getStrings("dfs.name.dir");
       if (dirNames == null) { dirNames = new String[] {"/tmp/hadoop/dfs/name"}; }
@@ -578,6 +580,10 @@
     public void rollFsImage() throws IOException {
       namesystem.rollFSImage();
     }
+    
+    public void finalizeUpgrade() throws IOException {
+      getFSImage().finalizeUpgrade();
+    }
 
     ////////////////////////////////////////////////////////////////
     // DatanodeProtocol
@@ -598,11 +604,11 @@
      * Return a block-oriented command for the datanode to execute.
      * This will be either a transfer or a delete operation.
      */
-    public BlockCommand sendHeartbeat(DatanodeRegistration nodeReg,
-                                      long capacity, 
-                                      long remaining,
-                                      int xmitsInProgress,
-                                      int xceiverCount) throws IOException {
+    public DatanodeCommand sendHeartbeat( DatanodeRegistration nodeReg,
+                                          long capacity, 
+                                          long remaining,
+                                          int xmitsInProgress,
+                                          int xceiverCount) throws IOException {
         Object xferResults[] = new Object[2];
         xferResults[0] = xferResults[1] = null;
         Object deleteList[] = new Object[1];
@@ -616,7 +622,7 @@
                                      deleteList)) {
           // request block report from the datanode
           assert(xferResults[0] == null && deleteList[0] == null);
-          return new BlockCommand( DataNodeAction.DNA_REGISTER );
+          return new DatanodeCommand( DataNodeAction.DNA_REGISTER );
         }
         
         //
@@ -639,14 +645,19 @@
         return null;
     }
 
-    public Block[] blockReport( DatanodeRegistration nodeReg,
-                                Block blocks[]) throws IOException {
+    public DatanodeCommand blockReport( DatanodeRegistration nodeReg,
+                                        Block blocks[]) throws IOException {
         verifyRequest( nodeReg );
         stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
                 +"from "+nodeReg.getName()+" "+blocks.length+" blocks" );
 
-        return namesystem.processReport( nodeReg, blocks );
-     }
+        Block blocksToDelete[] = namesystem.processReport( nodeReg, blocks );
+        if( blocksToDelete != null && blocksToDelete.length > 0 )
+            return new BlockCommand( blocksToDelete );
+        if( getFSImage().isUpgradeFinalized() )
+          return new DatanodeCommand( DataNodeAction.DNA_FINALIZE );
+        return null;
+    }
 
     public void blockReceived(DatanodeRegistration nodeReg, 
                               Block blocks[]) throws IOException {
@@ -664,12 +675,19 @@
                             int errorCode, 
                             String msg) throws IOException {
       // Log error message from datanode
+      LOG.info("Report from " + nodeReg.getName() + ": " + msg);
+      if( errorCode == DatanodeProtocol.NOTIFY ) {
+        return;
+      }
       verifyRequest( nodeReg );
-      LOG.warn("Report from " + nodeReg.getName() + ": " + msg);
       if( errorCode == DatanodeProtocol.DISK_ERROR ) {
           namesystem.removeDatanode( nodeReg );            
       }
     }
+    
+    public NamespaceInfo versionRequest() throws IOException {
+      return namesystem.getNamespaceInfo();
+    }
 
     /** 
      * Verify request.
@@ -693,7 +711,7 @@
      * @throws IOException
      */
     public void verifyVersion( int version ) throws IOException {
-      if( version != DFS_CURRENT_VERSION )
+      if( version != LAYOUT_VERSION )
         throw new IncorrectVersionException( version, "data node" );
     }
 
@@ -701,7 +719,11 @@
      * Returns the name of the fsImage file
      */
     public File getFsImageName() throws IOException {
-      return namesystem.getFsImageName();
+      return getFSImage().getFsImageName();
+    }
+    
+    FSImage getFSImage() {
+      return namesystem.dir.fsImage;
     }
 
     /**
@@ -709,7 +731,7 @@
      * checkpointing
      */
     public File[] getFsImageNameCheckpoint() throws IOException {
-      return namesystem.getFsImageNameCheckpoint();
+      return getFSImage().getFsImageNameCheckpoint();
     }
 
     /**
@@ -726,38 +748,92 @@
     public InetSocketAddress getNameNodeAddress() {
       return nameNodeAddress;
     }
+
+    /**
+     * Verify that configured directories exist, then
+     * Interactively confirm that formatting is desired 
+     * for each existing directory and format them.
+     * 
+     * @param conf
+     * @param isConfirmationNeeded
+     * @return true if formatting was aborted, false otherwise
+     * @throws IOException
+     */
+    private static boolean format(Configuration conf,
+                                  boolean isConfirmationNeeded
+                                ) throws IOException {
+      Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs( conf );
+      for( Iterator<File> it = dirsToFormat.iterator(); it.hasNext(); ) {
+        File curDir = it.next();
+        if( ! curDir.exists() )
+          continue;
+        if( isConfirmationNeeded ) {
+          System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
+          if (!(System.in.read() == 'Y')) {
+            System.err.println("Format aborted in "+ curDir);
+            return true;
+          }
+          while( System.in.read() != '\n' ); // discard the enter-key
+        }
+      }
+
+      FSNamesystem nsys = new FSNamesystem(new FSImage( dirsToFormat ));
+      nsys.dir.fsImage.format();
+      return false;
+    }
+
+    private static void printUsage() {
+      System.err.println(
+      "Usage: java NameNode [-format] | [-upgrade] | [-rollback]");
+    }
+
+    private static StartupOption parseArguments(String args[], 
+                                                Configuration conf ) {
+      int argsLen = (args == null) ? 0 : args.length;
+      StartupOption startOpt = StartupOption.REGULAR;
+      for( int i=0; i < argsLen; i++ ) {
+        String cmd = args[i];
+        if( "-format".equalsIgnoreCase(cmd) ) {
+          startOpt = StartupOption.FORMAT;
+        } else if( "-regular".equalsIgnoreCase(cmd) ) {
+          startOpt = StartupOption.REGULAR;
+        } else if( "-upgrade".equalsIgnoreCase(cmd) ) {
+          startOpt = StartupOption.UPGRADE;
+        } else if( "-rollback".equalsIgnoreCase(cmd) ) {
+          startOpt = StartupOption.ROLLBACK;
+        } else
+          return null;
+      }
+      conf.setObject( "dfs.namenode.startup", startOpt );
+      return startOpt;
+    }
+
+    static NameNode createNameNode( String argv[], 
+                                    Configuration conf ) throws Exception {
+      if( conf == null )
+        conf = new Configuration();
+      StartupOption startOpt = parseArguments( argv, conf );
+      if( startOpt == null ) {
+        printUsage();
+        return null;
+      }
+      
+      if( startOpt == StartupOption.FORMAT ) {
+        boolean aborted = format( conf, true );
+        System.exit(aborted ? 1 : 0);
+      }
+      
+      NameNode namenode = new NameNode(conf);
+      return namenode;
+    }
     
     /**
      */
     public static void main(String argv[]) throws Exception {
       try {
-        Configuration conf = new Configuration();
-
-        if (argv.length == 1 && argv[0].equals("-format")) {
-          boolean aborted = false;
-          File[] dirs = getDirs(conf);
-          for (int idx = 0; idx < dirs.length; idx++) {
-            if (dirs[idx].exists()) {
-              System.err.print("Re-format filesystem in " + dirs[idx] +" ? (Y or N) ");
-              if (!(System.in.read() == 'Y')) {
-                System.err.println("Format aborted in "+ dirs[idx]);
-                aborted = true;
-              } else {
-                format(dirs[idx]);
-                System.err.println("Formatted "+dirs[idx]);
-              }
-              System.in.read(); // discard the enter-key
-            }else{
-              format(dirs[idx]);
-              System.err.println("Formatted "+dirs[idx]);
-            }
-          }
-          System.exit(aborted ? 1 : 0);
-        }
-        
-        NameNode namenode = new NameNode(conf);
-        namenode.join();
-        
+        NameNode namenode = createNameNode( argv, null );
+        if( namenode != null )
+          namenode.join();
       } catch ( Throwable e ) {
         LOG.error( StringUtils.stringifyException( e ) );
         System.exit(-1);

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java?view=auto&rev=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java Tue Apr  3 14:39:25 2007
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * NamespaceInfo is returned by the name-node in reply 
+ * to a data-node handshake.
+ * 
+ * @author Konstantin Shvachko
+ */
+class NamespaceInfo extends StorageInfo implements Writable {
+  String  buildVersion;
+
+  public NamespaceInfo() {
+    super();
+    buildVersion = null;
+  }
+  
+  public NamespaceInfo( int nsID, long cT ) {
+    super( FSConstants.LAYOUT_VERSION, nsID, cT );
+    buildVersion = Storage.getBuildVersion();
+  }
+  
+  public String getBuildVersion() { return buildVersion; }
+  
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (NamespaceInfo.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new NamespaceInfo(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    UTF8.writeString( out, getBuildVersion() );
+    out.writeInt( getLayoutVersion() );
+    out.writeInt( getNamespaceID() );
+    out.writeLong( getCTime() );
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    buildVersion = UTF8.readString( in );
+    layoutVersion = in.readInt();
+    namespaceID = in.readInt();
+    cTime = in.readLong();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java Tue Apr  3 14:39:25 2007
@@ -19,8 +19,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
@@ -60,7 +58,6 @@
 
     private ClientProtocol namenode;
     private Configuration conf;
-    private String localName;
     private InetSocketAddress nameNodeAddr;
     private boolean shouldRun;
     private StatusHttpServer infoServer;
@@ -95,11 +92,6 @@
       this.conf = conf;
       this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
                        ClientProtocol.versionID, nameNodeAddr, conf);
-      try {
-        this.localName = InetAddress.getLocalHost().getHostName();
-      } catch (UnknownHostException uhe) {
-        this.localName = "";
-      }
 
       //
       // initialize the webserver for uploading files.
@@ -250,7 +242,7 @@
     void doCheckpoint() throws IOException {
 
       //
-      // Do the rquired initialization of the merge work area.
+      // Do the required initialization of the merge work area.
       //
       doSetup();
 
@@ -297,11 +289,12 @@
      * DEST_FS_IMAGE
      */
     private void doMerge() throws IOException {
-      FSImage fsImage = new FSImage(checkpointDir, FS_EDITS);
-      FSNamesystem namesystem = new FSNamesystem(fsImage);
-      fsImage.loadFSImage(conf, srcImage);
-      fsImage.getEditLog().loadFSEdits(conf, editFile);
-      fsImage.saveFSImage(DEST_FS_IMAGE);
+      FSNamesystem namesystem = new FSNamesystem(
+                                    new FSImage(checkpointDir));
+      FSImage fsImage = namesystem.dir.fsImage;
+      fsImage.loadFSImage(srcImage);
+      fsImage.getEditLog().loadFSEdits(editFile);
+      fsImage.saveFSImage(destImage);
     }
 
     /**

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java?view=auto&rev=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java Tue Apr  3 14:39:25 2007
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+import org.apache.hadoop.dfs.FSConstants.NodeType;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
+
+/**
+ * Common class for storage information.
+ * 
+ * TODO namespaceID should be long and computed as hash( address + port )
+ * @author Konstantin Shvachko
+ */
+class StorageInfo {
+  int   layoutVersion;  // Version read from the stored file.
+  int   namespaceID;    // namespace id of the storage
+  long  cTime;          // creation timestamp
+  
+  StorageInfo () {
+    this( 0, 0, 0L );
+  }
+  
+  StorageInfo( int layoutV, int nsID, long cT ) {
+    layoutVersion = layoutV;
+    namespaceID = nsID;
+    cTime = cT;
+  }
+  
+  StorageInfo( StorageInfo from ) {
+    layoutVersion = from.layoutVersion;
+    namespaceID = from.namespaceID;
+    cTime = from.cTime;
+  }
+
+  public int    getLayoutVersion(){ return layoutVersion; }
+  public int    getNamespaceID()  { return namespaceID; }
+  public long   getCTime()        { return cTime; }
+}
+
+/**
+ * Storage information file.
+ * <p>
+ * Local storage information is stored in a separate file VERSION.
+ * It contains type of the node, 
+ * the storage layout version, the namespace id, and 
+ * the fs state creation time.
+ * <p>
+ * Local storage can reside in multiple directories. 
+ * Each directory should contain the same VERSION file as the others.
+ * During startup Hadoop servers (name-node and data-nodes) read their local 
+ * storage information from them.
+ * <p>
+ * The servers hold a lock for each storage directory while they run so that 
+ * other nodes were not able to startup sharing the same storage.
+ * The locks are released when the servers stop (normally or abnormally).
+ * 
+ * @author Konstantin Shvachko
+ */
+abstract class Storage extends StorageInfo {
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.Storage");
+
+  // Constants
+  
+  // last layout version that did not suppot upgrades
+  protected static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
+  
+  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
+  protected static final String STORAGE_FILE_VERSION  = "VERSION";
+  private   static final String STORAGE_DIR_CURRENT   = "current";
+  private   static final String STORAGE_DIR_PREVIOUS  = "previous";
+  private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";
+  private   static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";
+  private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
+  
+  protected enum StorageState {
+    NON_EXISTENT,
+    NOT_FORMATTED,
+    CONVERT,
+    COMPLETE_UPGRADE,
+    RECOVER_UPGRADE,
+    COMPLETE_FINALIZE,
+    COMPLETE_ROLLBACK,
+    RECOVER_ROLLBACK,
+    NORMAL;
+  }
+  
+  private NodeType storageType;    // Type of the node using this storage 
+  protected List<StorageDirectory> storageDirs;
+  
+  /**
+   * One of the storage directories.
+   */
+  class StorageDirectory {
+    File              root; // root directory
+    FileLock          lock; // storage lock
+    
+    StorageDirectory( File dir ) {
+      this.root = dir;
+      this.lock = null;
+    }
+
+    /**
+     * Read version file.
+     * 
+     * @throws IOException if file cannot be read or contains inconsistent data
+     */
+    void read() throws IOException {
+      read( getVersionFile() );
+    }
+    
+    void read( File from ) throws IOException {
+      RandomAccessFile file = new RandomAccessFile( from, "rws" );
+      try {
+        FileInputStream in = new FileInputStream( file.getFD() );
+        file.seek(0);
+        Properties props = new Properties();
+        props.load( in );
+        getFields( props, this );
+      } finally {
+        file.close();
+      }
+    }
+
+    /**
+     * Write version file.
+     * 
+     * @throws IOException
+     */
+    void write() throws IOException {
+      write( getVersionFile() );
+    }
+
+    void write( File to ) throws IOException {
+      Properties props = new Properties();
+      setFields( props, this );
+      RandomAccessFile file = new RandomAccessFile( to, "rws" );
+      try {
+        file.seek(0);
+        FileOutputStream out = new FileOutputStream( file.getFD() );
+        props.store( out, null );
+      } finally {
+        file.close();
+      }
+    }
+
+    /**
+     * Clear and re-create storage directory.
+     * <p>
+     * Removes contents of the current directory and creates an empty directory.
+     * 
+     * This does not fully format storage directory. 
+     * It cannot write the version file since it should be written last after  
+     * all other storage type dependent files are written.
+     * Derived storage is responsible for setting specific storage values and
+     * writing the version file to disk.
+     * 
+     * @throws IOException
+     */
+    void clearDirectory() throws IOException {
+      File curDir = this.getCurrentDir();
+      if( curDir.exists() )
+        if( ! (FileUtil.fullyDelete( curDir )) )
+          throw new IOException("Cannot remove current directory: " + curDir );
+      if( ! curDir.mkdirs() )
+        throw new IOException( "Cannot create directory " + curDir );
+    }
+
+    File getCurrentDir() {
+      return new File( root, STORAGE_DIR_CURRENT );
+    }
+    File getVersionFile() {
+      return new File( new File( root, STORAGE_DIR_CURRENT ), STORAGE_FILE_VERSION );
+    }
+    File getPreviousVersionFile() {
+      return new File( new File( root, STORAGE_DIR_PREVIOUS ), STORAGE_FILE_VERSION );
+    }
+    File getPreviousDir() {
+      return new File( root, STORAGE_DIR_PREVIOUS );
+    }
+    File getPreviousTmp() {
+      return new File( root, STORAGE_TMP_PREVIOUS );
+    }
+    File getRemovedTmp() {
+      return new File( root, STORAGE_TMP_REMOVED );
+    }
+    File getFinalizedTmp() {
+      return new File( root, STORAGE_TMP_FINALIZED );
+    }
+
+    /**
+     * Check consistency of the storage directory
+     * 
+     * @param startOpt a startup option.
+     *  
+     * @return state {@link StorageState} of the storage directory 
+     * @throws {@link InconsistentFSStateException} if directory state is not 
+     * consistent and cannot be recovered 
+     */
+    StorageState analyzeStorage( StartupOption startOpt ) throws IOException {
+      assert root != null : "root is null";
+      String rootPath = root.getCanonicalPath();
+      try { // check that storage exists
+        if( ! root.exists() ) {
+          // storage directory does not exist
+          if( startOpt != StartupOption.FORMAT ) {
+            LOG.info( "Storage directory " + rootPath + " does not exist." );
+            return StorageState.NON_EXISTENT;
+          }
+          LOG.info( rootPath + " does not exist. Creating ..." );
+          if( ! root.mkdirs() )
+            throw new IOException( "Cannot create directory " + rootPath );
+        }
+        // or is inaccessible
+        if( ! root.isDirectory() ) {
+          LOG.info( rootPath + "is not a directory." );
+          return StorageState.NON_EXISTENT;
+        }
+        if( ! root.canWrite() ) {
+          LOG.info( "Cannot access storage directory " + rootPath );
+          return StorageState.NON_EXISTENT;
+        }
+      } catch( SecurityException ex ) {
+        LOG.info( "Cannot access storage directory " + rootPath, ex );
+        return StorageState.NON_EXISTENT;
+      }
+
+      this.lock(); // lock storage if it exists
+
+      if( startOpt == StartupOption.FORMAT )
+        return StorageState.NOT_FORMATTED;
+      // check whether a conversion is required
+      if( isConversionNeeded( this ) )
+        return StorageState.CONVERT;
+      // check whether current directory is valid
+      File versionFile = getVersionFile();
+      boolean hasCurrent = versionFile.exists();
+
+      // check which directories exist
+      boolean hasPrevious = getPreviousDir().exists();
+      boolean hasPreviousTmp = getPreviousTmp().exists();
+      boolean hasRemovedTmp = getRemovedTmp().exists();
+      boolean hasFinalizedTmp = getFinalizedTmp().exists();
+
+      if( !(hasPreviousTmp || hasRemovedTmp || hasFinalizedTmp) ) {
+        // no temp dirs - no recovery
+        if( hasCurrent )
+          return StorageState.NORMAL;
+        if( hasPrevious )
+          throw new InconsistentFSStateException( root,
+                      "version file in current directory it is missing." );
+        return StorageState.NOT_FORMATTED;
+      }
+
+      if( (hasPreviousTmp?1:0)+(hasRemovedTmp?1:0)+(hasFinalizedTmp?1:0) > 1 )
+        // more than one temp dirs
+        throw new InconsistentFSStateException( root,
+                    "too many temporary directories." );
+
+      // # of temp dirs == 1 should either recover or complete a transition
+      if( hasFinalizedTmp ) {
+        if( hasPrevious )
+          throw new InconsistentFSStateException( root,
+              STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
+              + "cannot exist together." );
+        return StorageState.COMPLETE_FINALIZE;
+      }
+
+      if( hasPreviousTmp ) {
+        if( hasPrevious )
+          throw new InconsistentFSStateException( root,
+              STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
+              + " cannot exist together." );
+        if( hasCurrent )
+          return StorageState.COMPLETE_UPGRADE;
+        return StorageState.RECOVER_UPGRADE;
+      }
+      
+      assert hasRemovedTmp : "hasRemovedTmp must be true";
+      if( !(hasCurrent ^ hasPrevious) )
+        throw new InconsistentFSStateException( root,
+            "one and only one directory " + STORAGE_DIR_CURRENT 
+            + " or " + STORAGE_DIR_PREVIOUS 
+            + " must be present when " + STORAGE_TMP_REMOVED
+            + " exists." );
+      if( hasCurrent )
+        return StorageState.COMPLETE_ROLLBACK;
+      return StorageState.RECOVER_ROLLBACK;
+    }
+
+    /**
+     * Complete or recover storage state from previously failed transition.
+     * 
+     * @param curState specifies what/how the state should be recovered
+     * @throws IOException
+     */
+    void doRecover( StorageState curState ) throws IOException {
+      File curDir = getCurrentDir();
+      String rootPath = root.getCanonicalPath();
+      switch( curState ) {
+        case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
+          LOG.info( "Completing previous upgrade for storage directory " 
+                    + rootPath + "." );
+          rename( getPreviousTmp(), getPreviousDir() );
+          return;
+        case RECOVER_UPGRADE:   // mv previous.tmp -> current
+          LOG.info( "Recovering storage directory " + rootPath
+                    + " from previous upgrade." );
+          if( curDir.exists() )
+            deleteDir( curDir );
+          rename( getPreviousTmp(), curDir );
+          return;
+        case COMPLETE_ROLLBACK: // rm removed.tmp
+          LOG.info( "Completing previous rollback for storage directory "
+                    + rootPath + "." );
+          deleteDir( getRemovedTmp() );
+          return;
+        case RECOVER_ROLLBACK:  // mv removed.tmp -> current
+          LOG.info( "Recovering storage directory " + rootPath
+                    + " from previous rollback." );
+          rename( getRemovedTmp(), curDir );
+          return;
+        case COMPLETE_FINALIZE: // rm finalized.tmp
+          LOG.info( "Completing previous finalize for storage directory "
+                    + rootPath + "." );
+          deleteDir( getFinalizedTmp() );
+          return;
+        default:
+          throw new IOException( "Unexpected FS state: " + curState );
+      }
+    }
+
+    /**
+     * Lock storage.
+     * 
+     * @throws IOException if locking fails
+     */
+    void lock() throws IOException {
+      File lockF = new File( root, STORAGE_FILE_LOCK );
+      lockF.deleteOnExit();
+      RandomAccessFile file = new RandomAccessFile( lockF, "rws" );
+      try {
+        this.lock = file.getChannel().tryLock();
+      } catch( IOException e ) {
+        LOG.info( StringUtils.stringifyException(e) );
+        file.close();
+        throw e;
+      }
+      if( lock == null ) {
+        String msg = "Cannot lock storage " + this.root 
+                      + ". The directory is already locked.";
+        LOG.info( msg );
+        file.close();
+        throw new IOException( msg );
+      }
+    }
+
+    /**
+     * Unlock storage.
+     * 
+     * @throws IOException
+     */
+    void unlock() throws IOException {
+      if( this.lock == null )
+        return;
+      this.lock.release();
+      lock.channel().close();
+    }
+  }
+
+  /**
+   * Create empty storage info of the specified type
+   */
+  Storage( NodeType type ) {
+    super();
+    this.storageType = type;
+  }
+  
+  Storage( NodeType type, int nsID, long cT ) {
+    super( FSConstants.LAYOUT_VERSION, nsID, cT );
+    this.storageType = type;
+  }
+  
+  Storage( NodeType type, StorageInfo storageInfo ) {
+    super( storageInfo );
+    this.storageType = type;
+  }
+  
+  int getNumStorageDirs() {
+    return storageDirs.size();
+  }
+  
+  StorageDirectory getStorageDir( int idx ) {
+    return storageDirs.get( idx );
+  }
+  
+  protected void addStorageDir( StorageDirectory sd ) {
+    storageDirs.add( sd );
+  }
+  
+  abstract boolean isConversionNeeded( StorageDirectory sd ) throws IOException;
+  
+  /**
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be get.
+   * 
+   * @param props
+   * @throws IOException
+   */
+  protected void getFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    String sv, st, sid, sct;
+    sv = props.getProperty( "layoutVersion" );
+    st = props.getProperty( "storageType" );
+    sid = props.getProperty( "namespaceID" );
+    sct = props.getProperty( "cTime" );
+    if( sv == null || st == null || sid == null || sct == null )
+      throw new InconsistentFSStateException( sd.root,
+                    "file " + STORAGE_FILE_VERSION + " is invalid." );
+    int rv = Integer.parseInt( sv );
+    NodeType rt = NodeType.valueOf( st );
+    int rid = Integer.parseInt( sid );
+    long rct = Long.parseLong( sct );
+    if( ! storageType.equals( rt ) ||
+        ! (( namespaceID == 0 ) || ( rid == 0 ) || namespaceID == rid ))
+      throw new InconsistentFSStateException( sd.root,
+                  "is incompatible with others." );
+    if( rv < FSConstants.LAYOUT_VERSION ) // future version
+        throw new IncorrectVersionException(rv, "storage directory " 
+                                            + sd.root.getCanonicalPath() );
+    layoutVersion = rv;
+    storageType = rt;
+    namespaceID = rid;
+    cTime = rct;
+  }
+  
+  /**
+   * Set common storage fields.
+   * Should be overloaded if additional fields need to be set.
+   * 
+   * @param props
+   * @throws IOException
+   */
+  protected void setFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    props.setProperty( "layoutVersion", String.valueOf( layoutVersion ));
+    props.setProperty( "storageType", storageType.toString() );
+    props.setProperty( "namespaceID", String.valueOf( namespaceID ));
+    props.setProperty( "cTime", String.valueOf( cTime ));
+  }
+
+  static void rename( File from, File to ) throws IOException {
+    if( ! from.renameTo( to ))
+      throw new IOException( "Failed to rename " 
+          + from.getCanonicalPath() + " to " + to.getCanonicalPath() );
+  }
+
+  static void deleteDir( File dir ) throws IOException {
+    if( ! FileUtil.fullyDelete( dir ) )
+      throw new IOException( "Failed to delete " + dir.getCanonicalPath() );
+  }
+  
+  /**
+   * Write all data storage files.
+   * @throws IOException
+   */
+  public void writeAll() throws IOException {
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+      it.next().write();
+    }
+  }
+
+  /**
+   * Close all the version files.
+   * @throws IOException
+   */
+  public void unlockAll() throws IOException {
+    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+      it.next().unlock();
+    }
+  }
+
+  public static String getBuildVersion() {
+    return VersionInfo.getRevision();
+  }
+
+  static String getRegistrationID( StorageInfo storage ) {
+    return "NS-" + Integer.toString( storage.getNamespaceID() )
+           + "-" + Integer.toString( storage.getLayoutVersion() )
+           + "-" + Long.toString( storage.getCTime() );
+  }
+  
+  /**
+   * @deprecated
+   * Provides conversion for deprecated DataNode constructor, should be removed
+   */
+  static AbstractList<File> makeListOfFiles( String[] dirs ) {
+    AbstractList<File> list = new ArrayList<File>( dirs.length );
+    for (int idx = 0; idx < dirs.length; idx++) {
+      list.add(new File(dirs[idx]));
+    }
+    return list;
+  }
+}



Mime
View raw message